多生产者单消费者场景,除了消费发生一次,之后队列“关闭”,不允许更多的工作。 我有一个MPSC队列,所以我尝试添加一个无锁算法来“关闭”队列。 我相信它是正确的,它通过了我的测试。 问题是,当我试图优化内存顺序时,它停止工作(我认为工作丢失了,例如,队列关闭后排队)。 即使是在拥有强内存模型的x64上,即使只有一个生产商。
我微调内存顺序的尝试被注释掉了:
// thread-safe for multi producers single consumer use
// linked-list based, and so it's growable
MPSC_queue work_queue;
std::atomic<bool> closed{ false };
std::atomic<int32_t> producers_num{ 0 };
bool produce(Work&& work)
{
bool res = false;
++producers_num;
// producers_num.fetch_add(1, std::memory_order_release);
if (!closed)
// if (!closed.load(std::memory_order_acquire))
{
work_queue.push(std::move(work));
res = true;
}
--producers_num;
// producers_num.fetch_sub(1, std::memory_order_release);
return res;
}
void consume()
{
closed = true;
// closed.store(true, std::memory_order_release);
while (producers_num != 0)
// while (producers_num.load(std::memory_order_acquire) != 0)
std::this_thread::yield();
Work work;
while (work_queue.pop(work))
process(work);
}
我还尝试了std::memory_order_acq_rel
在producers_num
上执行读-修改-写操作,也不起作用。
一道附加题:
该算法与MPSC队列一起使用,它已经在内部进行了一些同步。 最好将它们结合起来,以获得更好的性能。 你知道关于“可关闭”MPSC队列的任何这样的算法吗?
我认为closed=true;
确实需要是seq_cst,以确保在您第一次检查producers_num
之前它对其他线程是可见的。 否则,可以进行以下排序:
++producers_num;
producers_num==0
如果(!closed)
发现它仍处于打开状态close.store(true,release)
变得全局可见。work_queue.pop(work)
发现队列为空work_queue.push(std::move(work));
在使用者停止查找后将工作添加到队列。如果在返回之前有使用者检查producers_num==0
,则仍然可以避免seq_cst,如
while (producers_num != 0)
// while (producers_num.load(std::memory_order_acquire) != 0)
std::this_thread::yield();
do {
Work work;
while (work_queue.pop(work))
process(work);
} while(producers_num.load(acquire) != 0);
// safe if pop included a full barrier, I think
我不能百分之百地确定我有这个权利,但我认为在一个完整的障碍之后检查producer_num
就足够了。
但是,生产者端确实需要++producers_num;
至少是acq_rel,否则它可以在(!closed)之前重新排序。 (在它之后,if(!closed)
之前的一个获取围栏也可能有效)。
因为您只想使用一次队列,所以它不需要环绕,而且可能会简单得多。 就像一个原子生产者-位置计数器,写作者增加它来要求一个位置,如果他们得到一个位置>; 则队列已满。 不过,我还没有想清楚所有的细节。
这可能允许对上述问题有一个更干净的解决方案,也许是通过让使用者查看写索引来查看是否有任何生产者