详解高性能无锁队列的实现-2

简介: 详解高性能无锁队列的实现

三、基于循环数组的无锁队列


3.1 类接口和变量

#ifndef _ARRAYLOCKFREEQUEUE_H___
#define _ARRAYLOCKFREEQUEUE_H___
#include <stdint.h>
#ifdef _WIN64
#define QUEUE_INT int64_t
#else
#define QUEUE_INT unsigned long
#endif
#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // 2^16
template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
class ArrayLockFreeQueue
{
public:
  ArrayLockFreeQueue();
  virtual ~ArrayLockFreeQueue();
  QUEUE_INT size();
  bool enqueue(const ELEM_T &a_data);//入队
  bool dequeue(ELEM_T &a_data);// 出队
    bool try_dequeue(ELEM_T &a_data);//尝试入队
private:
  ELEM_T m_thequeue[Q_SIZE];
  volatile QUEUE_INT m_count;//队列的元素个数
  volatile QUEUE_INT m_writeIndex;//新元素入队时存放位置在数组中的下标
  volatile QUEUE_INT m_readIndex;//下一个出队元素在数组中的下标
  volatile QUEUE_INT m_maximumReadIndex;// 最后一个已经完成入队操作的元素在数组中的下标
  inline QUEUE_INT countToIndex(QUEUE_INT a_count);
};
#include "ArrayLockFreeQueueImp.h"
#endif
  • m_count:队列元素的个数。
  • m_writeIndex:指向新元素入队(可写入)的位置。只表示写请求成功并申请空间,并不代表数据已经写入,不能用于读取。
  • m_readIndex:指向下一个出队(可读出)元素的位置。
  • m_maximumReadIndex:指向存放最后一个有效数据(已经完成写入)的位置,即可读位置的边界。如果它的值跟m_writeIndex不一致,表明有写请求尚未完成。这意味着,有写请求成功申请了空间但数据还没完全写进队列。所以如果有线程要读取,必须要等到写线程将数据完全写入到队列之后。


[readIndex, maximumReadIndex)这一范围内的数据都可以读取。

ce51d6f317fd56ac2c390c1d1a5c8073_96b69d81a9dc471c9c26f8233ce4d95d.png


3.2 enqueue入队列

以下插图展示了对队列执行操作时各下标是如何变化的。如果一个位置被标记为X,标识这个位置里存放了数据。空白表示位置是空的。对于下图的情况,队列中存放了两个元素。WriteIndex指示的位置是新元素将会被插入的位置。ReadIndex指向的位置中的元素将会在下一次pop操作中被弹出。

2d7830f9a127433202c47893a5cd4f62_4ee4ef9b38f84871ad350753619c964c.png

1)当生产者准备将数据插入到队列中,它首先通过增加WriteIndex的值来申请空间MaximumReadIndex指向最后一个存放有效数据的位置(也就是实际的队列尾)。

32634dfaa4fd4cc388114c729fe53fac_1fdc211fb80f4a2aabb79d8705c6cfdc.png

2)一旦空间的申请完成,生产者就可以将数据拷贝到刚刚申请到的位置中。完成之后增MaximumReadIndex使得它与WriteIndex的一致。

8ab45eb0b228b6804c9f84aded2076b8_6b4d7c5eb102487192eac97a01b49d71.png

3)现在队列中有3个元素,接着又有一个生产者尝试向队列中插入元素。

b812771b1164dea4f7ec836f4e38b8ef_5cab41cc3f7547ae9790813b3827b95e.png


4)在第一个生产者完成数据拷贝之前,又有另外一个生产者申请了一个新的空间准备拷贝数据。现在有两个生产者同时向队列插入数据。

7349cfb5768066370cb7cfffccfe18c1_a9c5a5128d4640a8aa2cd23fb33aa4bd.png

5)现在生产者开始拷贝数据,在完成拷贝之后,对MaximumReadIndex的递增操作必须严格遵循一个顺序:第一个生产者线程首先递增MaximumReadIndex,接着才轮到第二个生产者。这个顺序必须被严格遵守的原因是,我们必须保证数据被完全拷贝到队列之后才允许消费者线程将其出列。


  while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
  {
    sched_yield(); // cas 更新失败后,让出 cpu
  }

让出cpu的目的也是为了让排在最前面的生产者完成m_maximumReadIndex的更新)

1d09322cc194a955de0bfd9fef2ce837_00f3a37c98ea4f2d88e2a4b4d6db4669.png

6)第一个生产者完成了数据拷贝,并对MaximumReadIndex完成了递增,现在第二个生产者可以递增

MaximumReadIndex了。

add42c768ad918fc142b17becf5076b2_e241dcffbf08431c8b391ceb42695568.png

7)第二个生产者完成了对MaximumReadIndex的递增,现在队列中有5个元素。


在多于一个生产者线程的情况下,sched_yield() 操作将 cpu 让给其他线程是很有必要的。

无锁算法和通过阻塞机制同步的算法的一个主要区别在于无锁算法不会阻塞在线程同步上。多线程环境下,多生产者线程向并发的往队列中存放数据,每个生产者线程所执行的 cas 操作都必须严格遵循 FIFO 次序,一个用于申请空间,另一个用于通知消费者数据已经写入完成可以被读取了。


如果我们的应用程序只有唯一的生产者操作这个队列,sche_yield()将永远没有机会被调用,第2个CAS操作永远不会失败。因为在一个生产者的情况下没有人能破坏生产者执行这两个CAS操作的FIFO顺序。而当多于一个生产者线程往队列中存放数据的时候,问题就出现了。


例如第一个 cas 操作的执行顺序是线程1,2,3,第二个 cas 操作的执行顺序也必须是线程1,2,3。若线程1执行第二个 cas 操作的时候被抢占,那么线程2,3只能在 cpu 上忙等(它们忙等,不让出处理器,线程1也就没机会执行,它们就只能继续忙等)。而这就是问题产生的根源。


这就是需要sche_yield()所,应当尽快让出 cpu,让线程1先执行。这样线程2和3才能继续完成它们的操作。


template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data) 
{
  QUEUE_INT currentWriteIndex; 
  QUEUE_INT currentReadIndex;  
  // 1、获取可写入的位置(CAS)
  do 
  {
         // 获取当前读写指针的位置
    currentWriteIndex = m_writeIndex;
    currentReadIndex = m_readIndex;
         // 判断队列是否满了,(write + 1) % Q_SIZE == read 
    if(countToIndex(currentWriteIndex + 1) ==
      countToIndex(currentReadIndex))
    {
      return false; 
    }
  // 获取可写入的位置(CAS),cas 更新失败则继续循环
  } while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
  // 2、写入数据
  m_thequeue[countToIndex(currentWriteIndex)] = a_data;  
  // 3、更新可读的位置(CAS),多线程环境下生产者线程按序更新可读位置
  while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
  {
    sched_yield(); // cas 更新失败后,让出 cpu
  }
  // 原子操作;队列中元素数量+1
  AtomicAdd(&m_count, 1);
  return true;
}

3.3 dequeue出队列

以下插图展示了元素出列的时候各种下标是如何变化的,队列中初始有2个元素。WriteIndex指示的位置是新元素将会被插入的位置。ReadIndex指向的位置中的元素将会在下一次pop操作中被弹出。


f4c725ee3c5724c92319e441ce1bda72_bee8e684eb7a4626b002451e45857a31.png

1)消费者线程拷贝数组ReadIndex位置的元素,然后尝试用CAS操作将ReadIndex加1。如果操作成功消费者成功的将数据出列。因为CAS操作是原子的,所以只有唯一的线程可以在同一时刻更ReadIndex的值。如果操作失败,读取新的ReadIndex值,以重复以上操作(copy数据,CAS)。

49b436755da8a7b2a34faa6eb95d3197_c9cfaaa1483946beabef7b351789b45b.png


2)现在又有一个消费者将元素出列,队列变成空。


698af91635b3ff6c1b80f31aed4c3671_dcab034a35ee4f5abab7f2b5927f4636.png

3)现在有一个生产者正在向队列中添加元素。它已经成功的申请了空间,但尚未完成数据拷贝。任何其它企图从队列中移除元素的消费者都会发现队列非空(因为writeIndex不等于readIndex)。但它不能读取readIndex所指向位置中的数据,因为readIndex与MaximumReadIndex相等。消费者将会在do循环中不断的反复尝试,直到生产者完成数据拷贝增加MaximumReadIndex的值,或者队列变成空(这在多个消费者的场景下会发生)。

c2759e1de77bee32620a49450d7a2c06_44c871acbca744a09ec5aea31a01eb52.png


4)当生产者完成数据拷贝,队列的大小是1,消费者线程可以读取这个数据了.


emplate <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
{
  QUEUE_INT currentMaximumReadIndex;
  QUEUE_INT currentReadIndex;
  do
  {
         // 获取当前可读位置[m_readIndex,  m_maximumReadIndex)
    currentReadIndex = m_readIndex;
    currentMaximumReadIndex = m_maximumReadIndex;
    // 若队列为空,返回 false
    if(countToIndex(currentReadIndex) ==
      countToIndex(currentMaximumReadIndex))    
    {
      return false;
    }
    // 读取数据
    a_data = m_thequeue[countToIndex(currentReadIndex)]; 
         // 更新可读的位置(CAS)
    if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
    {
      AtomicSub(&m_count, 1); // 原子操作,元素-1
      return true;
    }
  } while(true);
  assert(0);
  return false;
}

3.4 源码

// ArrayLockFreeQueue.h
#ifndef _ARRAYLOCKFREEQUEUE_H___
#define _ARRAYLOCKFREEQUEUE_H___
#include <stdint.h>
#ifdef _WIN64
#define QUEUE_INT int64_t
#else
#define QUEUE_INT unsigned long
#endif
#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535
template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
class ArrayLockFreeQueue
{
public:
  ArrayLockFreeQueue();
  virtual ~ArrayLockFreeQueue();
  QUEUE_INT size();
  bool enqueue(const ELEM_T &a_data); 
  bool dequeue(ELEM_T &a_data);
    bool try_dequeue(ELEM_T &a_data);
private:
  ELEM_T m_thequeue[Q_SIZE];          
  volatile QUEUE_INT m_count;     
  volatile QUEUE_INT m_writeIndex;  
  volatile QUEUE_INT m_readIndex;   
  volatile QUEUE_INT m_maximumReadIndex;
  inline QUEUE_INT countToIndex(QUEUE_INT a_count);
};
#include "ArrayLockFreeQueueImp.h"
#endif
// ArrayLockFreeQueueImp.h
#ifndef _ARRAYLOCKFREEQUEUEIMP_H___
#define _ARRAYLOCKFREEQUEUEIMP_H___
#include "ArrayLockFreeQueue.h"
#include <assert.h>
#include "atom_opt.h"
template <typename ELEM_T, QUEUE_INT Q_SIZE>
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :
  m_writeIndex(0),
  m_readIndex(0),
  m_maximumReadIndex(0)
{
  m_count = 0;
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()
{}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count)
{
  return (a_count % Q_SIZE);    
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()
{
  QUEUE_INT currentWriteIndex = m_writeIndex;
  QUEUE_INT currentReadIndex = m_readIndex;
  if(currentWriteIndex>=currentReadIndex)
    return currentWriteIndex - currentReadIndex;
  else
    return Q_SIZE + currentWriteIndex - currentReadIndex;
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data) 
{
  QUEUE_INT currentWriteIndex; 
  QUEUE_INT currentReadIndex;
  do 
  {
    currentWriteIndex = m_writeIndex;
    currentReadIndex = m_readIndex;
    if(countToIndex(currentWriteIndex + 1) ==
      countToIndex(currentReadIndex))
    {
      return false; 
    }
  } while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
  m_thequeue[countToIndex(currentWriteIndex)] = a_data; 
  while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
  {
    sched_yield();    
  }
  AtomicAdd(&m_count, 1);
  return true;
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::try_dequeue(ELEM_T &a_data)
{
    return dequeue(a_data);
}
template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
{
  QUEUE_INT currentMaximumReadIndex;
  QUEUE_INT currentReadIndex;
  do
  {
    currentReadIndex = m_readIndex;
    currentMaximumReadIndex = m_maximumReadIndex;
    if(countToIndex(currentReadIndex) ==
      countToIndex(currentMaximumReadIndex))    
    {
      return false;
    }
    a_data = m_thequeue[countToIndex(currentReadIndex)]; 
    if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
    {
      AtomicSub(&m_count, 1); // 真正读取到了数据,元素-1
    }
  } while(true);
  assert(0);
  return false;
}
#endif


四、测试结果


8e37490f327f951427f21d1bebea2f36_fcef4217a2d34e10a75eecfc482324e9.png

1、对于耗时短的任务


  • 无锁队列有明显的优势。
  • 线程不是越多越好。因为线程切换开销也不小。


2、对于耗时大的任务


  • 无锁队列优势不明显,甚至略弱于有锁
  • 可以适当增加线程,有助于效率提升


五、结论


对于耗时比较短的任务,无锁队列通常能够提供更好的性能,原因如下:


  • 减少锁开销:无锁队列通过使用原子操作等技术来实现线程安全,避免了显式的锁操作。相比之下,有锁队列需要在进入和离开临界区时获取和释放锁,这涉及到较多的上下文切换和系统调用,造成额外的开销。在任务耗时短的情况下,这些开销会占据较大的比例,从而影响性能。


  • 高并发性:无锁队列的设计通常能够支持更高的并发性,因为多个线程可以同时访问和修改队列,而无需互斥地获取锁。这样可以减少竞争和调度开销,提高整体的并行度和吞吐量。对于耗时短的任务,这种高并发性能够更好地利用系统资源,实现更高的性能。


然而,对于耗时比较长的任务,无锁队列可能不如有锁队列的原因主要包括以下几点:


  • 内存和处理器开销:无锁队列通常需要通过原子操作等技术来保证线程安全,这可能增加了额外的内存访问和处理器指令。而耗时较长的任务会使得这些开销相对不那么显著,反而增加了系统的负担。


  • 数据一致性:无锁队列的设计通常较为复杂,需要保证数据的一致性和正确性。在耗时较长的任务中,多线程并发访问和修改数据的机会增加,这可能导致更多的数据竞争和冲突,进而增加了数据一致性的难度和复杂性。


总结下来就是:


  1. 能用有锁队列的尽量使用有锁队列
  2. 什么时候使用无锁队列,满足以下的条件可考虑无锁队列(注意是可考虑):

○ 所处理的任务没有阻塞,纯cpu密集型; (如果有mysql数据库操作,有redis操作就没有必要)

○ 每秒处理的任务超过百万条(ops > 100万);

○ 生产者不存在io阻塞 (每秒产生任务 1万多也没有必要, 如果有锁队列影响io的吞吐量也可以使

用无锁)。

○ 生产者最好和消费者是1:1对应关系,不要出现 多写1读的情况。

目录
相关文章
|
7月前
|
存储 Kubernetes NoSQL
无锁队列实现及使用场景
无锁队列实现及使用场景
|
2月前
|
缓存 安全 算法
高性能无锁并发框架Disruptor,太强了!
高性能无锁并发框架Disruptor,太强了!
高性能无锁并发框架Disruptor,太强了!
|
7月前
|
存储 缓存 安全
【企业级理解】高效并发之Java内存模型
【企业级理解】高效并发之Java内存模型
|
消息中间件 存储 缓存
详解高性能无锁队列的实现-1
详解高性能无锁队列的实现
518 0
|
Java 程序员 编译器
从根上理解高性能、高并发:深入计算机底层,理解线程与线程池
作为即时通讯技术的开发者来说,高性能、高并发相关的技术概念早就了然于胸,什么线程池、零拷贝、多路复用、事件驱动、epoll等等名词信手拈来,又或许你对具有这些技术特征的技术框架比如:Java的Netty、Php的workman、Go的nget等熟练掌握。但真正到了面试或者技术实践过程中遇到无法释怀的疑惑时,方知自已所掌握的不过是皮毛。
高性能无锁并发框架Disruptor,太强了
Disruptor是一个开源框架,研发的初衷是为了解决高并发下队列锁的问题,最早由LMAX提出并使用,能够在无锁的情况下实现队列的并发操作,并号称能够在一个线程里每秒处理6百万笔订单
|
机器学习/深度学习 监控 Java
如何理解高性能服务器的高性能、高并发?
作为国内品牌服务器厂商,蓝海大脑液冷GPU服务器拥有大规模并行处理能力和无与伦比的灵活性。它主要用于为计算密集型应用程序提供足够的处理能力。GPU的优势在于可以由CPU运行应用程序代码,同时图形处理单元(GPU)可以处理大规模并行架构的计算密集型任务。GPU服务器是遥感测绘、医药研发、生命科学和高性能计算的理想选择。
如何理解高性能服务器的高性能、高并发?
|
Web App开发 监控 Java
当我们说起多线程与高并发时
当我们说起多线程与高并发时
当我们说起多线程与高并发时
|
存储 缓存 算法
系统性能百倍提升典型案例分析:高性能队列Disruptor
Disruptor 是一款高性能的有界内存队列,目前应用非常广泛,Log4j2、SpringMessaging、HBase、Storm 都用到了 Disruptor,那 Disruptor 的性能为什么这么高呢?
系统性能百倍提升典型案例分析:高性能队列Disruptor
|
消息中间件 缓存 算法
Java并发:性能与可伸缩性
Java并发:性能与可伸缩性
162 0
Java并发:性能与可伸缩性