wangzhengquan
2020-07-20 f85c9b875b060681b51f57b15074ba1c7c9f5636
queue/include/array_lock_free_queue.h
@@ -1,9 +1,11 @@
#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#include "atomic_ops.h"
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
#include "logger_factory.h"
#include "mem_pool.h"
#include "shm_allocator.h"
/// @brief implementation of an array based lock free queue with support for 
///        multiple producers
@@ -15,13 +17,15 @@
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator = SHM_Allocator>
class ArrayLockFreeQueue
{
    // ArrayLockFreeQueue will be using this' private members
    template <
        typename ELEM_T_, 
        template <typename T> class Q_TYPE >
        typename Allocator_,
        template <typename T, typename AT> class Q_TYPE
        >
    friend class LockFreeQueue;
private:
@@ -52,10 +56,10 @@
    ELEM_T *m_theQueue;
    /// @brief where a new element will be inserted
    std::atomic<uint32_t> m_writeIndex;
    uint32_t m_writeIndex;
    /// @brief where the next element where be extracted from
    std::atomic<uint32_t> m_readIndex;
    uint32_t m_readIndex;
    
    /// @brief maximum read index for multiple producer queues
    /// If it's not the same as m_writeIndex it means
@@ -65,23 +69,23 @@
    /// to wait for those other threads to save the data into the queue
    ///
    /// note this is only used for multiple producers
    std::atomic<uint32_t> m_maximumReadIndex;
    uint32_t m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    /// @brief number of elements in the queue
    std::atomic<uint32_t> m_count;
    uint32_t m_count;
#endif
   
    
private:
    /// @brief disable copy constructor declaring it private
    ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src);
    ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
};
template <typename ELEM_T>
ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize):
template <typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
    Q_SIZE(qsize),
    m_writeIndex(0),      // initialisation is not atomic
    m_readIndex(0),       //
@@ -90,38 +94,38 @@
    ,m_count(0)           //
#endif
{
    m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T));
    m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
}
template <typename ELEM_T>
ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue()
template <typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
{
    // std::cout << "destroy ArrayLockFreeQueue\n";
    mm_free(m_theQueue);
    Allocator::deallocate(m_theQueue);
    
}
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator>
inline 
uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count)
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
{
    // if Q_SIZE is a power of 2 this statement could be also written as 
    // return (a_count & (Q_SIZE - 1));
    return (a_count % Q_SIZE);
}
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator>
inline 
uint32_t ArrayLockFreeQueue<ELEM_T>::size()
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return m_count.load();
    return m_count;
#else
    uint32_t currentWriteIndex = m_maximumReadIndex.load();
    uint32_t currentReadIndex  = m_readIndex.load();
    uint32_t currentWriteIndex = m_maximumReadIndex;
    uint32_t currentReadIndex  = m_readIndex;
    // let's think of a scenario where this function returns bogus data
    // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
@@ -146,13 +150,13 @@
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator>
inline 
bool ArrayLockFreeQueue<ELEM_T>::full()
bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count.load() == (Q_SIZE));
    return (m_count == (Q_SIZE));
#else
    uint32_t currentWriteIndex = m_writeIndex;
@@ -171,16 +175,16 @@
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator>
inline 
bool ArrayLockFreeQueue<ELEM_T>::empty()
bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count.load() == 0);
    return (m_count == 0);
#else
    if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load()))
    if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
    {
        // the queue is full
        return true;
@@ -194,52 +198,44 @@
}
template <typename ELEM_T>
bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
{
    uint32_t currentReadIndex;
    uint32_t currentWriteIndex;
    do
    {
        currentWriteIndex = m_writeIndex.load();
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count.load() == Q_SIZE) {
        currentWriteIndex = m_writeIndex;
        currentReadIndex  = m_readIndex;
    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count == Q_SIZE) {
            return false;
        }
#else
        if (countToIndex(currentWriteIndex + 1) == countToIndex(m_readIndex.load()))
    #else
        if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
        {
            // the queue is full
            return false;
        }
#endif
    // There is more than one producer. Keep looping till this thread is able
    // to allocate space for current piece of data
    //
    // using compare_exchange_strong because it isn't allowed to fail spuriously
    // When the compare_exchange operation is in a loop the weak version
    // will yield better performance on some platforms, but here we'd have to
    // load m_writeIndex all over again
    } while (!m_writeIndex.compare_exchange_strong(
                currentWriteIndex, (currentWriteIndex + 1)));
    // Just made sure this index is reserved for this thread.
    #endif
    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
    // We know now that this index is reserved for us. Use it to save the data
    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
    //memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) );
    // update the maximum read index after saving the piece of data. It can't
    // fail if there is only one thread inserting in the queue. It might fail
    // if there is more than 1 producer thread because this operation has to
    // be done in the same order as the previous CAS
    //
    // using compare_exchange_weak because they are allowed to fail spuriously
    // (act as if *this != expected, even if they are equal), but when the
    // compare_exchange operation is in a loop the weak version will yield
    // better performance on some platforms.
    while (!m_maximumReadIndex.compare_exchange_weak(
                currentWriteIndex, (currentWriteIndex + 1)))
    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
    // inserting in the queue. It might fail if there are more than 1 producer threads because this
    // operation has to be done in the same order as the previous CAS
    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
    {
        // this is a good place to yield the thread in case there are more
        // software threads than hardware processors and you have more
@@ -248,35 +244,35 @@
        sched_yield();
    }
    // The value was successfully inserted into the queue
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    m_count.fetch_add(1);
    AtomicAdd(&m_count, 1);
#endif
    return true;
}
template <typename ELEM_T>
bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
{
    uint32_t currentMaximumReadIndex;
    uint32_t currentReadIndex;
    do
    {
        currentReadIndex = m_readIndex.load();
        // to ensure thread-safety when there is more than 1 producer thread
        // a second index is defined (m_maximumReadIndex)
        currentReadIndex        = m_readIndex;
        currentMaximumReadIndex = m_maximumReadIndex;
    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
     #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count.load() == 0) {
        if (m_count == 0) {
            return false;
        }
    #else
        // to ensure thread-safety when there is more than 1 producer
        // thread a second index is defined (m_maximumReadIndex)
        if (countToIndex(currentReadIndex) == countToIndex(m_maximumReadIndex.load()))
        if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
        {
            // the queue is empty or
            // a producer thread has allocate space in the queue but is
            // a producer thread has allocate space in the queue but is
            // waiting to commit the data into it
            return false;
        }
@@ -284,23 +280,22 @@
        // retrieve the data from the queue
        a_data = m_theQueue[countToIndex(currentReadIndex)];
        //memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) );
        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we
        // a_data already contains what m_readIndex pointed to before we
        // increased it
        if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1)))
        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
        {
            // got here. The value was retrieved from the queue. Note that the
            // data inside the m_queue array is not deleted nor reseted
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
            m_count.fetch_sub(1);
#endif
        #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
            // m_count.fetch_sub(1);
            AtomicSub(&m_count, 1);
        #endif
            return true;
        }
        // it failed retrieving the element off the queue. Someone else must
        // have read the element stored at countToIndex(currentReadIndex)
        // before we could perform the CAS operation
        // before we could perform the CAS operation
    } while(1); // keep looping to try again!
@@ -308,18 +303,17 @@
    assert(0);
    // Add this return statement to avoid compiler warnings
    return false;
    return false;
}
template <typename ELEM_T>
ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i)
template <typename ELEM_T, typename Allocator>
ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
    int currentCount = m_count.load();
    uint32_t currentReadIndex = m_readIndex.load();
    int currentCount = m_count;
    uint32_t currentReadIndex = m_readIndex;
    if (i < 0 || i >= currentCount)
    {
        std::cerr << "Error in array limits: " << i << " is out of range\n";
        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
        std::exit(EXIT_FAILURE);
    }
    return m_theQueue[countToIndex(currentReadIndex+i)];