wangzhengquan
2020-07-07 082633f08aae8eea19bd7050cbe4a75e5ed1ac6f
squeue/include/lock_free_queue.h
@@ -4,9 +4,12 @@
#include <stdint.h>     // uint32_t
#include <atomic>
#include <usg_common.h>
#include <assert.h> // assert()
#include "mm.h"
#include "sem_util.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 65536 // (2^16)
#define LOCK_FREE_Q_DEFAULT_SIZE 16
// define this macro if calls to "size" must return the real size of the 
// queue. If it is undefined  that function will try to take a snapshot of 
@@ -17,7 +20,7 @@
//
template <typename ELEM_T>
class ArrayLockFreeQueueMultipleProducers;
class ArrayLockFreeQueue;
/// @brief Lock-free queue based on a circular array
@@ -32,7 +35,7 @@
///   ArrayLockFreeQueue<int, 16> q;
///                              // queue of ints of size (16 - 1) and
///                              // defaulted to single producer
///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q;
///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
///                              // queue of ints of size (100 - 1) with support
///                              // for multiple producers
///
@@ -57,13 +60,16 @@
///        last 4 elements of the queue are not used when the counter rolls
///        over to 0
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and 
///        ArrayLockFreeQueueMultipleProducers are supported (single producer
///        ArrayLockFreeQueue are supported (single producer
///        by default)
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE = ArrayLockFreeQueueMultipleProducers >
    template <typename T> class Q_TYPE = ArrayLockFreeQueue >
class LockFreeQueue
{
private:
    int slots;
    int items;
public:    
    /// @brief constructor of the class
    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
@@ -88,7 +94,7 @@
    /// environments this function might return bogus values. See help in method
    /// LockFreeQueue::size
    inline bool full();
    inline bool empty();
    /// @brief push an element at the tail of the queue
@@ -97,13 +103,21 @@
    /// structures to be inserted in the queue you should think of instantiate the template
    /// of the queue as a pointer to that large structure
    /// @return true if the element was inserted in the queue. False if the queue was full
    inline bool push(const ELEM_T &a_data);
    bool push(const ELEM_T &a_data);
    bool push_nowait(const ELEM_T &a_data);
    bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
    /// @brief pop the element at the head of the queue
    /// @param a reference where the element in the head of the queue will be saved to
    /// Note that the a_data parameter might contain rubbish if the function returns false
    /// @return true if the element was successfully extracted from the queue. False if the queue was empty
    inline bool pop(ELEM_T &a_data);
    bool pop(ELEM_T &a_data);
    bool pop_nowait(ELEM_T &a_data);
    bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
    void *operator new(size_t size);
    void operator delete(void *p);
protected:
    /// @brief the actual queue. methods are forwarded into the real 
@@ -117,77 +131,186 @@
 
/// @brief implementation of an array based lock free queue with support for
///        multiple producers
/// This class is prevented from being instantiated directly (all members and
/// methods are private). To instantiate a multiple producers lock free queue
/// you must use the ArrayLockFreeQueue fachade:
///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q;
template <typename ELEM_T>
class ArrayLockFreeQueueMultipleProducers
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize):
    m_qImpl(qsize)
{
    // ArrayLockFreeQueue will be using this' private members
    template <
        typename ELEM_T_,
        template <typename T> class Q_TYPE >
    friend class LockFreeQueue;
    slots = SemUtil::get(IPC_PRIVATE, qsize);
    items = SemUtil::get(IPC_PRIVATE, 0);
}
private:
    /// @brief constructor of the class
    ArrayLockFreeQueueMultipleProducers(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    virtual ~ArrayLockFreeQueueMultipleProducers();
    inline uint32_t size();
    inline bool full();
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue()
{
    SemUtil::remove(slots);
    SemUtil::remove(items);
}
    inline bool empty();
    bool push(const ELEM_T &a_data);
    bool pop(ELEM_T &a_data);
    /// @brief calculate the index in the circular array that corresponds
    /// to a particular "count" value
    inline uint32_t countToIndex(uint32_t a_count);
private:
    size_t Q_SIZE;
    /// @brief array to keep the elements
    ELEM_T *m_theQueue;
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Q_TYPE>::size()
{
    return m_qImpl.size();
}
    /// @brief where a new element will be inserted
    std::atomic<uint32_t> m_writeIndex;
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Q_TYPE>::full()
{
    return m_qImpl.full();
}
    /// @brief where the next element where be extracted from
    std::atomic<uint32_t> m_readIndex;
    /// @brief maximum read index for multiple producer queues
    /// If it's not the same as m_writeIndex it means
    /// there are writes pending to be "committed" to the queue, that means,
    /// the place for the data was reserved (the index in the array) but
    /// data is still not in the queue, so the thread trying to read will have
    /// to wait for those other threads to save the data into the queue
    ///
    /// note this is only used for multiple producers
    std::atomic<uint32_t> m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    /// @brief number of elements in the queue
    std::atomic<uint32_t> m_count;
#endif
    static int m_reference;
private:
    /// @brief disable copy constructor declaring it private
    ArrayLockFreeQueueMultipleProducers<ELEM_T>(const ArrayLockFreeQueueMultipleProducers<ELEM_T> &a_src);
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Q_TYPE>::empty()
{
    return m_qImpl.empty();
}
};
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Q_TYPE>::push(const ELEM_T &a_data)
{
    if (SemUtil::dec(slots) == -1) {
        err_exit(errno, "push");
    }
    if ( m_qImpl.push(a_data) ) {
        SemUtil::inc(items);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (SemUtil::dec_nowait(slots) == -1) {
        if (errno == EAGAIN)
            return false;
        else
            err_exit(errno, "push_nowait");
    }
    if ( m_qImpl.push(a_data)) {
        SemUtil::inc(items);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
{
    if (SemUtil::dec_timeout(slots, timeout) == -1) {
        if (errno == EAGAIN)
            return false;
        else
            err_exit(errno, "push_timeout");
    }
    if (m_qImpl.push(a_data)){
        SemUtil::inc(items);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data)
{
    if (SemUtil::dec(items) == -1) {
        err_exit(errno, "remove");
    }
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (SemUtil::dec_nowait(items) == -1) {
        if (errno == EAGAIN)
            return false;
        else
            err_exit(errno, "remove_nowait");
    }
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
{
    if (SemUtil::dec_timeout(items, timeout) == -1) {
        if (errno == EAGAIN)
            return false;
        else
            err_exit(errno, "remove_timeout");
    }
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
void * LockFreeQueue<ELEM_T, Q_TYPE>::operator new(size_t size){
        return mm_malloc(size);
}
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
void LockFreeQueue<ELEM_T, Q_TYPE>::operator delete(void *p) {
    return mm_free(p);
}
// include implementation files
#include "lock_free_queue_impl.h"
#include "lock_free_queue_impl_multiple_producer.h"
#include "linked_lock_free_queue.h"
#include "array_lock_free_queue.h"
#endif // _LOCK_FREE_QUEUE_H__