fujuntang
2021-08-11 68d23225a38a35f1325eb39fa4ed5a005d5de473
src/queue/array_lock_free_queue.h
@@ -1,10 +1,11 @@
#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#ifndef __ARRAY_LOCK_FREE_QUEUE_H__
#define __ARRAY_LOCK_FREE_QUEUE_H__
#include "atomic_ops.h"
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
#include "logger_factory.h"
#include "mem_pool.h"
#include "shm_mm.h"
#include "shm_allocator.h"
/// @brief implementation of an array based lock free queue with support for 
@@ -17,306 +18,290 @@
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
template <typename ELEM_T, typename Allocator = SHM_Allocator>
class ArrayLockFreeQueue
{
    // ArrayLockFreeQueue will be using this' private members
    template <
        typename ELEM_T_,
        typename Allocator_,
        template <typename T, typename AT> class Q_TYPE
        >
    friend class LockFreeQueue;
template<typename ELEM_T, typename Allocator = SHM_Allocator>
class ArrayLockFreeQueue {
  // ArrayLockFreeQueue will be using this' private members
  template<
    typename ELEM_T_,
    typename Allocator_,
    template<typename T, typename AT> class Q_TYPE
  >
  friend
  class LockFreeQueue;
private:
    /// @brief constructor of the class
    ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    virtual ~ArrayLockFreeQueue();
    inline uint32_t size();
    inline bool full();
  /// @brief constructor of the class
  ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    inline bool empty();
    bool push(const ELEM_T &a_data);
    bool pop(ELEM_T &a_data);
    /// @brief calculate the index in the circular array that corresponds
    /// to a particular "count" value
    inline uint32_t countToIndex(uint32_t a_count);
  virtual ~ArrayLockFreeQueue();
    ELEM_T& operator[](unsigned i);
private:
    size_t Q_SIZE;
    /// @brief array to keep the elements
    ELEM_T *m_theQueue;
  inline uint32_t size();
    /// @brief where a new element will be inserted
    uint32_t m_writeIndex;
  inline bool full();
    /// @brief where the next element where be extracted from
    uint32_t m_readIndex;
    /// @brief maximum read index for multiple producer queues
    /// If it's not the same as m_writeIndex it means
    /// there are writes pending to be "committed" to the queue, that means,
    /// the place for the data was reserved (the index in the array) but
    /// data is still not in the queue, so the thread trying to read will have
    /// to wait for those other threads to save the data into the queue
    ///
    /// note this is only used for multiple producers
    uint32_t m_maximumReadIndex;
  inline bool empty();
  bool push(const ELEM_T &a_data);
  bool pop(ELEM_T &a_data);
  /// @brief calculate the index in the circular array that corresponds
  /// to a particular "count" value
  inline uint32_t countToIndex(uint32_t a_count);
  ELEM_T &operator[](unsigned i);
private:
  size_t Q_SIZE;
  /// @brief array to keep the elements
  ELEM_T *m_theQueue;
  /// @brief where a new element will be inserted
  uint32_t m_writeIndex;
  /// @brief where the next element where be extracted from
  uint32_t m_readIndex;
  /// @brief maximum read index for multiple producer queues
  /// If it's not the same as m_writeIndex it means
  /// there are writes pending to be "committed" to the queue, that means,
  /// the place for the data was reserved (the index in the array) but
  /// data is still not in the queue, so the thread trying to read will have
  /// to wait for those other threads to save the data into the queue
  ///
  /// note this is only used for multiple producers
  uint32_t m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    /// @brief number of elements in the queue
    uint32_t m_count;
  /// @brief number of elements in the queue
  uint32_t m_count;
#endif
private:
    /// @brief disable copy constructor declaring it private
    ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
  /// @brief disable copy constructor declaring it private
  ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
};
template <typename ELEM_T, typename Allocator>
template<typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
    Q_SIZE(qsize),
    m_writeIndex(0),      // initialisation is not atomic
    m_readIndex(0),       //
    m_maximumReadIndex(0) //
  Q_SIZE(qsize),
  m_writeIndex(0),      // initialisation is not atomic
  m_readIndex(0),       //
  m_maximumReadIndex(0) //
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    ,m_count(0)           //
  , m_count(0)           //
#endif
{
    m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
  m_theQueue = (ELEM_T *) Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
}
template <typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
{
    // std::cout << "destroy ArrayLockFreeQueue\n";
    Allocator::deallocate(m_theQueue);
template<typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() {
  // std::cout << "destroy ArrayLockFreeQueue\n";
  Allocator::deallocate(m_theQueue);
}
template <typename ELEM_T, typename Allocator>
inline
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
{
    // if Q_SIZE is a power of 2 this statement could be also written as
    // return (a_count & (Q_SIZE - 1));
    return (a_count % Q_SIZE);
template<typename ELEM_T, typename Allocator>
inline
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) {
  // if Q_SIZE is a power of 2 this statement could be also written as
  // return (a_count & (Q_SIZE - 1));
  return (a_count % Q_SIZE);
}
template <typename ELEM_T, typename Allocator>
inline
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
{
template<typename ELEM_T, typename Allocator>
inline
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return m_count;
  return m_count;
#else
    uint32_t currentWriteIndex = m_maximumReadIndex;
    uint32_t currentReadIndex  = m_readIndex;
  uint32_t currentWriteIndex = m_maximumReadIndex;
  uint32_t currentReadIndex  = m_readIndex;
    // let's think of a scenario where this function returns bogus data
    // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
    // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
    // 2. afterwards this thread is preemted. While this thread is inactive 2
    // elements are inserted and removed from the queue, so m_maximumReadIndex
    // is 5 and m_readIndex 4. Real size is still 1
    // 3. Now the current thread comes back from preemption and reads m_readIndex.
    // currentReadIndex is 4
    // 4. currentReadIndex is bigger than currentWriteIndex, so
    // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
    // it returns that the queue is almost full, when it is almost empty
    //
    if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
    {
        return (currentWriteIndex - currentReadIndex);
    }
    else
    {
        return (Q_SIZE + currentWriteIndex - currentReadIndex);
    }
  // let's think of a scenario where this function returns bogus data
  // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
  // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
  // 2. afterwards this thread is preemted. While this thread is inactive 2
  // elements are inserted and removed from the queue, so m_maximumReadIndex
  // is 5 and m_readIndex 4. Real size is still 1
  // 3. Now the current thread comes back from preemption and reads m_readIndex.
  // currentReadIndex is 4
  // 4. currentReadIndex is bigger than currentWriteIndex, so
  // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
  // it returns that the queue is almost full, when it is almost empty
  //
  if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
  {
      return (currentWriteIndex - currentReadIndex);
  }
  else
  {
      return (Q_SIZE + currentWriteIndex - currentReadIndex);
  }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T, typename Allocator>
inline
bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
{
template<typename ELEM_T, typename Allocator>
inline
bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count == (Q_SIZE));
  return (m_count == (Q_SIZE));
#else
    uint32_t currentWriteIndex = m_writeIndex;
    uint32_t currentReadIndex  = m_readIndex;
  uint32_t currentWriteIndex = m_writeIndex;
  uint32_t currentReadIndex  = m_readIndex;
  if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
  {
      // the queue is full
      return true;
  }
  else
  {
      // not full!
      return false;
  }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template<typename ELEM_T, typename Allocator>
inline
bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  return (m_count == 0);
#else
  if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
  {
      // the queue is full
      return true;
  }
  else
  {
      // not full!
      return false;
  }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template<typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) {
  uint32_t currentReadIndex;
  uint32_t currentWriteIndex;
  do {
    currentWriteIndex = m_writeIndex;
    currentReadIndex = m_readIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    if (m_count == Q_SIZE) {
      return false;
    }
#else
    if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
    {
        // the queue is full
        return true;
    }
    else
    {
        // not full!
        return false;
    }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T, typename Allocator>
inline
bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count == 0);
#else
    if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
    {
        // the queue is full
        return true;
    }
    else
    {
        // not full!
        return false;
    }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
{
    uint32_t currentReadIndex;
    uint32_t currentWriteIndex;
    do
    {
        currentWriteIndex = m_writeIndex;
        currentReadIndex  = m_readIndex;
    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count == Q_SIZE) {
            return false;
        }
    #else
        if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
        {
            // the queue is full
            return false;
        }
    #endif
    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
    // We know now that this index is reserved for us. Use it to save the data
    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
    // inserting in the queue. It might fail if there are more than 1 producer threads because this
    // operation has to be done in the same order as the previous CAS
    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
    {
        // this is a good place to yield the thread in case there are more
        // software threads than hardware processors and you have more
        // than 1 producer thread
        // have a look at sched_yield (POSIX.1b)
        sched_yield();
    }
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    AtomicAdd(&m_count, 1);
#endif
    return true;
  } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
  // We know now that this index is reserved for us. Use it to save the data
  m_theQueue[countToIndex(currentWriteIndex)] = a_data;
  // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
  // inserting in the queue. It might fail if there are more than 1 producer threads because this
  // operation has to be done in the same order as the previous CAS
  while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
    // this is a good place to yield the thread in case there are more
    // software threads than hardware processors and you have more
    // than 1 producer thread
    // have a look at sched_yield (POSIX.1b)
    sched_yield();
  }
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  AtomicAdd(&m_count, 1);
#endif
  return true;
}
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
{
    uint32_t currentMaximumReadIndex;
    uint32_t currentReadIndex;
template<typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) {
  uint32_t currentMaximumReadIndex;
  uint32_t currentReadIndex;
    do
    {
        // to ensure thread-safety when there is more than 1 producer thread
        // a second index is defined (m_maximumReadIndex)
        currentReadIndex        = m_readIndex;
        currentMaximumReadIndex = m_maximumReadIndex;
    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  do {
    // to ensure thread-safety when there is more than 1 producer thread
    // a second index is defined (m_maximumReadIndex)
    currentReadIndex = m_readIndex;
    currentMaximumReadIndex = m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count == 0) {
            return false;
        }
    #else
        if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
        {
            // the queue is empty or
            // a producer thread has allocate space in the queue but is
            // waiting to commit the data into it
            return false;
        }
    #endif
        // retrieve the data from the queue
        a_data = m_theQueue[countToIndex(currentReadIndex)];
        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we
        // increased it
        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
        {
        #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
            // m_count.fetch_sub(1);
            AtomicSub(&m_count, 1);
        #endif
            return true;
        }
        // it failed retrieving the element off the queue. Someone else must
        // have read the element stored at countToIndex(currentReadIndex)
        // before we could perform the CAS operation
    } while(1); // keep looping to try again!
    // Something went wrong. it shouldn't be possible to reach here
    assert(0);
    // Add this return statement to avoid compiler warnings
    return false;
}
template <typename ELEM_T, typename Allocator>
ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
    int currentCount = m_count;
    uint32_t currentReadIndex = m_readIndex;
    if (i >= currentCount)
    {
        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
        std::exit(EXIT_FAILURE);
    if (m_count == 0) {
      return false;
    }
    return m_theQueue[countToIndex(currentReadIndex+i)];
#else
    if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
    {
        // the queue is empty or
        // a producer thread has allocate space in the queue but is
        // waiting to commit the data into it
        return false;
    }
#endif
    // retrieve the data from the queue
    a_data = m_theQueue[countToIndex(currentReadIndex)];
    // try to perfrom now the CAS operation on the read index. If we succeed
    // a_data already contains what m_readIndex pointed to before we
    // increased it
    if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
      // m_count.fetch_sub(1);
      AtomicSub(&m_count, 1);
#endif
      return true;
    }
    // it failed retrieving the element off the queue. Someone else must
    // have read the element stored at countToIndex(currentReadIndex)
    // before we could perform the CAS operation
  } while (1); // keep looping to try again!
  // Something went wrong. it shouldn't be possible to reach here
  assert(0);
  // Add this return statement to avoid compiler warnings
  return false;
}
template<typename ELEM_T, typename Allocator>
ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) {
  int currentCount = m_count;
  uint32_t currentReadIndex = m_readIndex;
  if (i >= currentCount) {
    std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i
              << " is out of range\n";
    std::exit(EXIT_FAILURE);
  }
  return m_theQueue[countToIndex(currentReadIndex + i)];
}
#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__