zhangmeng
2024-01-18 f68d32c9f4b2f46d26d96839801d1888a93f93ed
src/queue/lock_free_queue.h
@@ -1,25 +1,37 @@
/**
 * encapsulate array_lock_free_queue, add semphore. populate in kernal space.
 */
#ifndef __LOCK_FREE_QUEUE_H__
#define __LOCK_FREE_QUEUE_H__
#include <usg_common.h>
#include <assert.h> // assert()
#include "mem_pool.h"
#include "shm_mm.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "shm_allocator.h"
#include "psem.h"
#include "bus_error.h"
#include "bus_def.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
#define LOCK_FREE_Q_DEFAULT_SIZE 320
// define this macro if calls to "size" must return the real size of the
// queue. If it is undefined  that function will try to take a snapshot of
#define LOCK_FREE_Q_ST_OPENED 0
#define LOCK_FREE_Q_ST_CLOSED 1
// static Logger *logger = LoggerFactory::getLogger();
// define this macro if calls to "size" must return the real size of the
// queue. If it is undefined  that function will try to take a snapshot of
// the queue, but returned value might be bogus
// forward declarations for default template values
//
template <typename ELEM_T, typename Allocator>
template<typename ELEM_T, typename Allocator>
class ArrayLockFreeQueue;
// template <typename ELEM_T>
@@ -27,9 +39,9 @@
/// @brief Lock-free queue based on a circular array
/// No allocation of extra memory for the nodes handling is needed, but it has
/// to add extra overhead (extra CAS operation) when inserting to ensure the
/// thread-safety of the queue when the queue type is not
/// No allocation of extra memory for the nodes handling is needed, but it has
/// to add extra overhead (extra CAS operation) when inserting to ensure the
/// thread-safety of the queue when the queue type is not
/// ArrayLockFreeQueueSingleProducer.
///
/// examples of instantiation:
@@ -44,322 +56,288 @@
///
/// ELEM_T represents the type of elementes pushed and popped from the queue
/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
///        This number should be a power of 2 to ensure
///        indexes in the circular queue keep stable when the uint32_t
///        This number should be a power of 2 to ensure
///        indexes in the circular queue keep stable when the uint32_t
///        variable that holds the current position rolls over from FFFFFFFF
///        to 0. For instance
///        2    -> 0x02
///        2    -> 0x02
///        4    -> 0x04
///        8    -> 0x08
///        16   -> 0x10
///        (...)
///        (...)
///        1024 -> 0x400
///        2048 -> 0x800
///
///        if queue size is not defined as requested, let's say, for
///        instance 100, when current position is FFFFFFFF (4,294,967,295)
///        index in the circular array is 4,294,967,295 % 100 = 95.
///        When that value is incremented it will be set to 0, that is the
///        index in the circular array is 4,294,967,295 % 100 = 95.
///        When that value is incremented it will be set to 0, that is the
///        last 4 elements of the queue are not used when the counter rolls
///        over to 0
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
///        ArrayLockFreeQueue are supported (single producer
///        by default)
template <
    typename ELEM_T,
    typename Allocator = SHM_Allocator,
    template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
     >
class LockFreeQueue
{
template<
  typename ELEM_T,
  typename Allocator = SHM_Allocator,
  template<typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
>
class LockFreeQueue {
private:
    int slots;
    int items;
  sem_t slots;
  sem_t items;
  time_t createTime;
  time_t closeTime;
  int status;
public:
    int mutex;
    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    /// @brief destructor of the class.
    /// Note it is not virtual since it is not expected to inherit from this
    /// template
    ~LockFreeQueue();
    std::atomic_uint reference;
    /// @brief constructor of the class
    /// @brief returns the current number of items in the queue
    /// It tries to take a snapshot of the size of the queue, but in busy environments
    /// this function might return bogus values.
    ///
    /// If a reliable queue size must be kept you might want to have a look at
    /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
    /// it enables a reliable size though it hits overall performance of the queue
    /// (when the reliable size variable is on it's got an impact of about 20% in time)
    inline uint32_t size();
    /// @brief return true if the queue is full. False otherwise
    /// It tries to take a snapshot of the size of the queue, but in busy
    /// environments this function might return bogus values. See help in method
    /// LockFreeQueue::size
    inline bool full();
  LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    inline bool empty();
  /// @brief destructor of the class.
  /// Note it is not virtual since it is not expected to inherit from this
  /// template
  ~LockFreeQueue();
    inline ELEM_T& operator[](unsigned i);
  inline void  close();
  inline bool isClosed();
    /// @brief push an element at the tail of the queue
    /// @param the element to insert in the queue
    /// Note that the element is not a pointer or a reference, so if you are using large data
    /// structures to be inserted in the queue you should think of instantiate the template
    /// of the queue as a pointer to that large structure
    /// @return true if the element was inserted in the queue. False if the queue was full
    bool push(const ELEM_T &a_data);
    bool push_nowait(const ELEM_T &a_data);
    bool push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
    /// @brief pop the element at the head of the queue
    /// @param a reference where the element in the head of the queue will be saved to
    /// Note that the a_data parameter might contain rubbish if the function returns false
    /// @return true if the element was successfully extracted from the queue. False if the queue was empty
    bool pop(ELEM_T &a_data);
    bool pop_nowait(ELEM_T &a_data);
    bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
  // std::atomic_uint reference;
  /// @brief constructor of the class
    void *operator new(size_t size);
    void operator delete(void *p);
  /// @brief returns the current number of items in the queue
  /// It tries to take a snapshot of the size of the queue, but in busy environments
  /// this function might return bogus values.
  ///
  /// If a reliable queue size must be kept you might want to have a look at
  /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
  /// it enables a reliable size though it hits overall performance of the queue
  /// (when the reliable size variable is on it's got an impact of about 20% in time)
  inline uint32_t size();
  /// @brief return true if the queue is full. False otherwise
  /// It tries to take a snapshot of the size of the queue, but in busy
  /// environments this function might return bogus values. See help in method
  /// LockFreeQueue::size
  inline bool full();
  inline bool empty();
  inline ELEM_T &operator[](unsigned i);
  time_t getCreateTime() {
    return createTime;
  }
  time_t getCloseTime() {
    return closeTime;
  }
  int getStatus() {
    return status;
  }
  /// @brief push an element at the tail of the queue
  /// @param the element to insert in the queue
  /// Note that the element is not a pointer or a reference, so if you are using large data
  /// structures to be inserted in the queue you should think of instantiate the template
  /// of the queue as a pointer to that large structure
  /// @return true if the element was inserted in the queue. False if the queue was full
  int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
  /// @brief pop the element at the head of the queue
  /// @param a reference where the element in the head of the queue will be saved to
  /// Note that the a_data parameter might contain rubbish if the function returns false
  /// @return true if the element was successfully extracted from the queue. False if the queue was empty
  int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
  void *operator new(size_t size);
  void operator delete(void *p);
protected:
    /// @brief the actual queue. methods are forwarded into the real
    ///        implementation
    Q_TYPE<ELEM_T, Allocator> m_qImpl;
  /// @brief the actual queue. methods are forwarded into the real
  ///        implementation
  Q_TYPE<ELEM_T, Allocator> m_qImpl;
private:
    /// @brief disable copy constructor declaring it private
    LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
  /// @brief disable copy constructor declaring it private
  LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
};
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
{
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
    slots = SemUtil::get(IPC_PRIVATE, qsize);
    items = SemUtil::get(IPC_PRIVATE, 0);
    mutex = SemUtil::get(IPC_PRIVATE, 1);
}
template<
  typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) {
  //std::cout << "LockFreeQueue init reference=" << reference << std::endl;
  if (sem_init(&slots, 1, qsize) == -1)
    err_exit(errno, "LockFreeQueue sem_init");
  if (sem_init(&items, 1, 0) == -1)
    err_exit(errno, "LockFreeQueue sem_init");
  createTime = time(NULL);
  status = LOCK_FREE_Q_ST_OPENED;
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
{
    LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
    SemUtil::remove(slots);
    SemUtil::remove(items);
    SemUtil::remove(mutex);
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
{
    return m_qImpl.size();
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
{
    return m_qImpl.full();
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
{
    return m_qImpl.empty();
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
 // printf("==================LockFreeQueue push before\n");
    if (SemUtil::dec(slots) == -1) {
        err_msg(errno, "LockFreeQueue push");
        return false;
    }
    if ( m_qImpl.push(a_data) ) {
        SemUtil::inc(items);
 // printf("==================LockFreeQueue push after\n");
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (SemUtil::dec_nowait(slots) == -1) {
        if (errno == EAGAIN)
            return false;
        else {
            err_msg(errno, "LockFreeQueue push_nowait");
            return false;
        }
    }
    if ( m_qImpl.push(a_data)) {
        SemUtil::inc(items);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout)
{
    if (SemUtil::dec_timeout(slots, timeout) == -1) {
        if (errno == EAGAIN)
            return false;
        else {
            // err_msg(errno, "LockFreeQueue push_timeout");
            return false;
        }
    }
    if (m_qImpl.push(a_data)){
        SemUtil::inc(items);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
 // printf("==================LockFreeQueue pop before\n");
    if (SemUtil::dec(items) == -1) {
        err_msg(errno, "LockFreeQueue pop");
        return false;
    }
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);
 // printf("==================LockFreeQueue pop after\n");
        return true;
    }
    return false;
template<
  typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() {
  status = LOCK_FREE_Q_ST_CLOSED;
  closeTime = time(NULL);
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (SemUtil::dec_nowait(items) == -1) {
        if (errno == EAGAIN)
            return false;
        else {
            err_msg(errno, "LockFreeQueue pop_nowait");
            return false;
        }
    }
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);
        return true;
    }
    return false;
template<
  typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::isClosed() {
  return status == LOCK_FREE_Q_ST_CLOSED;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
{
// printf("==================LockFreeQueue pop_timeout before\n");
    if (SemUtil::dec_timeout(items, timeout) == -1) {
        if (errno == EAGAIN)
            return false;
        else {
            // err_msg(errno, "LockFreeQueue pop_timeout");
            return false;
        }
template<
  typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
  if (sem_destroy(&slots) == -1) {
    err_exit(errno, "LockFreeQueue sem_destroy");
  }
  if (sem_destroy(&items) == -1) {
    err_exit(errno, "LockFreeQueue sem_destroy");
  }
}
template<
  typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() {
  return m_qImpl.size();
}
template<
  typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() {
  return m_qImpl.full();
}
template<
  typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() {
  return m_qImpl.empty();
}
template<typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
  sigset_t mask_all, pre;
  sigfillset(&mask_all);
  sigprocmask(SIG_BLOCK, &mask_all, &pre);
  if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
    if (psem_trywait(&slots) == -1) {
      goto LABEL_FAILTURE;
    }
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);
// printf("==================LockFreeQueue pop_timeout after\n");
        return true;
  } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
    if (psem_timedwait(&slots, timeout) == -1) {
      goto LABEL_FAILTURE;
    }
    return false;
  } else {
    if (psem_wait(&slots) == -1) {
      goto LABEL_FAILTURE;
    }
  }
  if (m_qImpl.push(a_data)) {
    psem_post(&items);
    sigprocmask(SIG_SETMASK, &pre, NULL);
    return 0;
  }
  LABEL_FAILTURE:
  sigprocmask(SIG_SETMASK, &pre, NULL);
  return errno;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
    return m_qImpl.operator[](i);
template<typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
  sigset_t mask_all, pre;
  sigfillset(&mask_all);
  sigprocmask(SIG_BLOCK, &mask_all, &pre);
  if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
    if (psem_trywait(&items) == -1) {
      goto LABEL_FAILTURE;
    }
  } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
   if (psem_timedwait(&items, timeout) == -1) {
      goto LABEL_FAILTURE;
    }
  } else {
    if (psem_wait(&items) == -1) {
      goto LABEL_FAILTURE;
    }
  }
  if (m_qImpl.pop(a_data)) {
    psem_post(&slots);
    sigprocmask(SIG_SETMASK, &pre, NULL);
    return 0;
  }
  LABEL_FAILTURE:
  sigprocmask(SIG_SETMASK, &pre, NULL);
  return errno;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
        return Allocator::allocate(size);
template<typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
ELEM_T &LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
  return m_qImpl.operator[](i);
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
template<typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
void *LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size) {
  return Allocator::allocate(size);
}
template<typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
    return Allocator::deallocate(p);
  LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * _que =  (LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * )p;
  Allocator::deallocate(p);
}
// include implementation files