tmp
wangzhengquan
2021-01-22 4c73fd7179e92bee9cccb65e46823b00f568acb3
src/queue/array_lock_free_queue.h
@@ -1,5 +1,6 @@
#ifndef __ARRAY_LOCK_FREE_QUEUE_H__
#define __ARRAY_LOCK_FREE_QUEUE_H__
#include "atomic_ops.h"
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
@@ -18,15 +19,15 @@
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
template <typename ELEM_T, typename Allocator = SHM_Allocator>
class ArrayLockFreeQueue
{
class ArrayLockFreeQueue {
    // ArrayLockFreeQueue will be using this' private members
    template <
        typename ELEM_T_, 
        typename Allocator_,
        template <typename T, typename AT> class Q_TYPE
        >
    friend class LockFreeQueue;
  friend
  class LockFreeQueue;
private:
    /// @brief constructor of the class
@@ -99,8 +100,7 @@
}
template <typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
{
ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() {
    // std::cout << "destroy ArrayLockFreeQueue\n";
    Allocator::deallocate(m_theQueue);
    
@@ -108,8 +108,7 @@
template <typename ELEM_T, typename Allocator>
inline 
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
{
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) {
    // if Q_SIZE is a power of 2 this statement could be also written as 
    // return (a_count & (Q_SIZE - 1));
    return (a_count % Q_SIZE);
@@ -117,8 +116,7 @@
template <typename ELEM_T, typename Allocator>
inline 
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
{
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return m_count;
@@ -152,8 +150,7 @@
template <typename ELEM_T, typename Allocator>
inline 
bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
{
bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count == (Q_SIZE));
@@ -177,8 +174,7 @@
template <typename ELEM_T, typename Allocator>
inline 
bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
{
bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count == 0);
@@ -198,18 +194,12 @@
}
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
{
bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) {
    uint32_t currentReadIndex;
    uint32_t currentWriteIndex;
    do
    {
  do {
        currentWriteIndex = m_writeIndex;
        currentReadIndex  = m_readIndex;
@@ -235,8 +225,7 @@
    // inserting in the queue. It might fail if there are more than 1 producer threads because this
    // operation has to be done in the same order as the previous CAS
    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
    {
  while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
        // this is a good place to yield the thread in case there are more
        // software threads than hardware processors and you have more
        // than 1 producer thread
@@ -252,13 +241,11 @@
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
{
bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) {
    uint32_t currentMaximumReadIndex;
    uint32_t currentReadIndex;
    do
    {
  do {
        // to ensure thread-safety when there is more than 1 producer thread
        // a second index is defined (m_maximumReadIndex)
        currentReadIndex        = m_readIndex;
@@ -284,8 +271,7 @@
        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we
        // increased it
        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
        {
    if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
        #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
            // m_count.fetch_sub(1);
            AtomicSub(&m_count, 1);
@@ -307,13 +293,12 @@
}
template <typename ELEM_T, typename Allocator>
ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) {
    int currentCount = m_count;
    uint32_t currentReadIndex = m_readIndex;
    if (i >= currentCount)
    {
        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
  if (i >= currentCount) {
    std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i
              << " is out of range\n";
        std::exit(EXIT_FAILURE);
    }
    return m_theQueue[countToIndex(currentReadIndex+i)];