wangzhengquan
2020-07-07 082633f08aae8eea19bd7050cbe4a75e5ed1ac6f
squeue/include/array_lock_free_queue.h
File was renamed from squeue/include/lock_free_queue_impl_multiple_producer.h
@@ -4,11 +4,82 @@
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
/// @brief implementation of an array based lock free queue with support for
///        multiple producers
/// This class is prevented from being instantiated directly (all members and
/// methods are private). To instantiate a multiple producers lock free queue
/// you must use the ArrayLockFreeQueue fachade:
///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
template <typename ELEM_T>
int ArrayLockFreeQueueMultipleProducers<ELEM_T>::m_reference = 0;
class ArrayLockFreeQueue
{
    // ArrayLockFreeQueue will be using this' private members
    template <
        typename ELEM_T_,
        template <typename T> class Q_TYPE >
    friend class LockFreeQueue;
private:
    /// @brief constructor of the class
    ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    virtual ~ArrayLockFreeQueue();
    inline uint32_t size();
    inline bool full();
    inline bool empty();
    bool push(const ELEM_T &a_data);
    bool pop(ELEM_T &a_data);
    /// @brief calculate the index in the circular array that corresponds
    /// to a particular "count" value
    inline uint32_t countToIndex(uint32_t a_count);
private:
    size_t Q_SIZE;
    /// @brief array to keep the elements
    ELEM_T *m_theQueue;
    /// @brief where a new element will be inserted
    std::atomic<uint32_t> m_writeIndex;
    /// @brief where the next element where be extracted from
    std::atomic<uint32_t> m_readIndex;
    /// @brief maximum read index for multiple producer queues
    /// If it's not the same as m_writeIndex it means
    /// there are writes pending to be "committed" to the queue, that means,
    /// the place for the data was reserved (the index in the array) but
    /// data is still not in the queue, so the thread trying to read will have
    /// to wait for those other threads to save the data into the queue
    ///
    /// note this is only used for multiple producers
    std::atomic<uint32_t> m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    /// @brief number of elements in the queue
    std::atomic<uint32_t> m_count;
#endif
    static int m_reference;
private:
    /// @brief disable copy constructor declaring it private
    ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src);
};
template <typename ELEM_T>
ArrayLockFreeQueueMultipleProducers<ELEM_T>::ArrayLockFreeQueueMultipleProducers(size_t qsize):
int ArrayLockFreeQueue<ELEM_T>::m_reference = 0;
template <typename ELEM_T>
ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize):
    Q_SIZE(qsize),
    m_writeIndex(0),      // initialisation is not atomic
    m_readIndex(0),       //
@@ -23,9 +94,9 @@
}
template <typename ELEM_T>
ArrayLockFreeQueueMultipleProducers<ELEM_T>::~ArrayLockFreeQueueMultipleProducers()
ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue()
{
    std::cout << "destroy ArrayLockFreeQueueMultipleProducers\n";
    std::cout << "destroy ArrayLockFreeQueue\n";
    m_reference--;
    if(m_reference == 0) {
       mm_free(m_theQueue);
@@ -35,7 +106,7 @@
template <typename ELEM_T>
inline 
uint32_t ArrayLockFreeQueueMultipleProducers<ELEM_T>::countToIndex(uint32_t a_count)
uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count)
{
    // if Q_SIZE is a power of 2 this statement could be also written as 
    // return (a_count & (Q_SIZE - 1));
@@ -44,7 +115,7 @@
template <typename ELEM_T>
inline 
uint32_t ArrayLockFreeQueueMultipleProducers<ELEM_T>::size()
uint32_t ArrayLockFreeQueue<ELEM_T>::size()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -79,7 +150,7 @@
template <typename ELEM_T>
inline 
bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::full()
bool ArrayLockFreeQueue<ELEM_T>::full()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -104,7 +175,7 @@
template <typename ELEM_T>
inline 
bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::empty()
bool ArrayLockFreeQueue<ELEM_T>::empty()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -126,7 +197,7 @@
template <typename ELEM_T>
bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::push(const ELEM_T &a_data)
bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
{
    uint32_t currentWriteIndex;
    
@@ -187,7 +258,7 @@
}
template <typename ELEM_T>
bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::pop(ELEM_T &a_data)
bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
{
    uint32_t currentReadIndex;