tmp
wangzhengquan
2021-01-22 4c73fd7179e92bee9cccb65e46823b00f568acb3
src/queue/lock_free_queue.h
@@ -12,20 +12,20 @@
#include "shm_allocator.h"
#include "psem.h"
#include "bus_error.h"
#include "bus_def.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
// static Logger *logger = LoggerFactory::getLogger();
// define this macro if calls to "size" must return the real size of the
// queue. If it is undefined  that function will try to take a snapshot of
// define this macro if calls to "size" must return the real size of the
// queue. If it is undefined  that function will try to take a snapshot of
// the queue, but returned value might be bogus
// forward declarations for default template values
//
template <typename ELEM_T, typename Allocator>
template<typename ELEM_T, typename Allocator>
class ArrayLockFreeQueue;
// template <typename ELEM_T>
@@ -33,9 +33,9 @@
/// @brief Lock-free queue based on a circular array
/// No allocation of extra memory for the nodes handling is needed, but it has
/// to add extra overhead (extra CAS operation) when inserting to ensure the
/// thread-safety of the queue when the queue type is not
/// No allocation of extra memory for the nodes handling is needed, but it has
/// to add extra overhead (extra CAS operation) when inserting to ensure the
/// thread-safety of the queue when the queue type is not
/// ArrayLockFreeQueueSingleProducer.
///
/// examples of instantiation:
@@ -50,113 +50,109 @@
///
/// ELEM_T represents the type of elementes pushed and popped from the queue
/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
///        This number should be a power of 2 to ensure
///        indexes in the circular queue keep stable when the uint32_t
///        This number should be a power of 2 to ensure
///        indexes in the circular queue keep stable when the uint32_t
///        variable that holds the current position rolls over from FFFFFFFF
///        to 0. For instance
///        2    -> 0x02
///        2    -> 0x02
///        4    -> 0x04
///        8    -> 0x08
///        16   -> 0x10
///        (...)
///        (...)
///        1024 -> 0x400
///        2048 -> 0x800
///
///        if queue size is not defined as requested, let's say, for
///        instance 100, when current position is FFFFFFFF (4,294,967,295)
///        index in the circular array is 4,294,967,295 % 100 = 95.
///        When that value is incremented it will be set to 0, that is the
///        index in the circular array is 4,294,967,295 % 100 = 95.
///        When that value is incremented it will be set to 0, that is the
///        last 4 elements of the queue are not used when the counter rolls
///        over to 0
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
///        ArrayLockFreeQueue are supported (single producer
///        by default)
template <
    typename ELEM_T,
    typename Allocator = SHM_Allocator,
    template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
     >
class LockFreeQueue
{
template<
        typename ELEM_T,
        typename Allocator = SHM_Allocator,
        template<typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
>
class LockFreeQueue {
private:
    sem_t slots;
    sem_t items;
  sem_t slots;
  sem_t items;
public:
    sem_t mutex;
    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    /// @brief destructor of the class.
    /// Note it is not virtual since it is not expected to inherit from this
    /// template
    ~LockFreeQueue();
    std::atomic_uint reference;
    /// @brief constructor of the class
  sem_t mutex;
    /// @brief returns the current number of items in the queue
    /// It tries to take a snapshot of the size of the queue, but in busy environments
    /// this function might return bogus values.
    ///
    /// If a reliable queue size must be kept you might want to have a look at
    /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
    /// it enables a reliable size though it hits overall performance of the queue
    /// (when the reliable size variable is on it's got an impact of about 20% in time)
    inline uint32_t size();
    /// @brief return true if the queue is full. False otherwise
    /// It tries to take a snapshot of the size of the queue, but in busy
    /// environments this function might return bogus values. See help in method
    /// LockFreeQueue::size
    inline bool full();
  LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    inline bool empty();
  /// @brief destructor of the class.
  /// Note it is not virtual since it is not expected to inherit from this
  /// template
  ~LockFreeQueue();
    inline ELEM_T& operator[](unsigned i);
    /// @brief push an element at the tail of the queue
    /// @param the element to insert in the queue
    /// Note that the element is not a pointer or a reference, so if you are using large data
    /// structures to be inserted in the queue you should think of instantiate the template
    /// of the queue as a pointer to that large structure
    /// @return true if the element was inserted in the queue. False if the queue was full
    int push(const ELEM_T &a_data);
    int push_nowait(const ELEM_T &a_data);
    int push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
    /// @brief pop the element at the head of the queue
    /// @param a reference where the element in the head of the queue will be saved to
    /// Note that the a_data parameter might contain rubbish if the function returns false
    /// @return true if the element was successfully extracted from the queue. False if the queue was empty
    int pop(ELEM_T &a_data);
    int pop_nowait(ELEM_T &a_data);
    int pop_timeout(ELEM_T &a_data, struct timespec * timeout);
  std::atomic_uint reference;
  /// @brief constructor of the class
    void *operator new(size_t size);
    void operator delete(void *p);
  /// @brief returns the current number of items in the queue
  /// It tries to take a snapshot of the size of the queue, but in busy environments
  /// this function might return bogus values.
  ///
  /// If a reliable queue size must be kept you might want to have a look at
  /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
  /// it enables a reliable size though it hits overall performance of the queue
  /// (when the reliable size variable is on it's got an impact of about 20% in time)
  inline uint32_t size();
  /// @brief return true if the queue is full. False otherwise
  /// It tries to take a snapshot of the size of the queue, but in busy
  /// environments this function might return bogus values. See help in method
  /// LockFreeQueue::size
  inline bool full();
  inline bool empty();
  inline ELEM_T &operator[](unsigned i);
  /// @brief push an element at the tail of the queue
  /// @param the element to insert in the queue
  /// Note that the element is not a pointer or a reference, so if you are using large data
  /// structures to be inserted in the queue you should think of instantiate the template
  /// of the queue as a pointer to that large structure
  /// @return true if the element was inserted in the queue. False if the queue was full
  int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
  /// @brief pop the element at the head of the queue
  /// @param a reference where the element in the head of the queue will be saved to
  /// Note that the a_data parameter might contain rubbish if the function returns false
  /// @return true if the element was successfully extracted from the queue. False if the queue was empty
  int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
  void *operator new(size_t size);
  void operator delete(void *p);
protected:
    /// @brief the actual queue. methods are forwarded into the real
    ///        implementation
    Q_TYPE<ELEM_T, Allocator> m_qImpl;
  /// @brief the actual queue. methods are forwarded into the real
  ///        implementation
  Q_TYPE<ELEM_T, Allocator> m_qImpl;
private:
    /// @brief disable copy constructor declaring it private
    LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
  /// @brief disable copy constructor declaring it private
  LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
};
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
{
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
template<
        typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) {
    // std::cout << "LockFreeQueue init reference=" << reference << std::endl;
    if (sem_init(&slots, 1, qsize) == -1)
        err_exit(errno, "LockFreeQueue sem_init");
    if (sem_init(&items, 1, 0) == -1)
@@ -164,194 +160,130 @@
    if (sem_init(&mutex, 1, 1) == -1)
        err_exit(errno, "LockFreeQueue sem_init");
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
{
template<
        typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
    // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
    if(sem_destroy(&slots) == -1) {
    if (sem_destroy(&slots) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
    }
    if(sem_destroy(&items) == -1) {
    if (sem_destroy(&items) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
    }
    if(sem_destroy(&mutex) == -1) {
    if (sem_destroy(&mutex) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
    }
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
{
template<
        typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() {
    return m_qImpl.size();
}
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
{
template<
        typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() {
    return m_qImpl.full();
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
{
template<
        typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() {
    return m_qImpl.empty();
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
 LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
    if (psem_wait(&slots) == -1) {
        return -1;
    }
    if ( m_qImpl.push(a_data) ) {
        psem_post(&items);
LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (psem_trywait(&slots) == -1) {
        return -1;
    }
    if ( m_qImpl.push(a_data)) {
        psem_post(&items);
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
{
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");
    if ( psem_timedwait(&slots, ts) == -1) {
        return -1;
    }
    if (m_qImpl.push(a_data)){
        psem_post(&items);
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
  LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if (psem_wait(&items) == -1) {
        return -1;
template<typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
    LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
    if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        if (psem_trywait(&slots) == -1) {
            return -1;
        }
    } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        if (psem_timedwait(&slots, timeout) == -1) {
            return -1;
        }
    } else {
        if (psem_wait(&slots) == -1) {
            return -1;
        }
    }
    if (m_qImpl.push(a_data)) {
        psem_post(&items);
        LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
        return 0;
    }
    return -1;
}
template<typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
    LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        if (psem_trywait(&items) == -1) {
            return -1;
        }
    } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        if (psem_timedwait(&items, timeout) == -1) {
            return -1;
        }
    } else {
        if (psem_wait(&items) == -1) {
            return -1;
        }
    }
    if (m_qImpl.pop(a_data)) {
        psem_post(&slots);
 LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
        LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (psem_trywait(&items) == -1) {
        return -1;
    }
    if (m_qImpl.pop(a_data)) {
        psem_post(&slots);
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
    if (psem_timedwait(&items, ts) == -1) {
       return -1;
    }
    if (m_qImpl.pop(a_data)) {
        psem_post(&slots);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
template<typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
ELEM_T &LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
    return m_qImpl.operator[](i);
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
        return Allocator::allocate(size);
template<typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
void *LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size) {
    return Allocator::allocate(size);
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
template<typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
    return Allocator::deallocate(p);
}