From 4c73fd7179e92bee9cccb65e46823b00f568acb3 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 22 一月 2021 16:57:34 +0800 Subject: [PATCH] tmp --- src/queue/lock_free_queue.h | 404 ++++++++++++++++++++++++--------------------------------- 1 files changed, 168 insertions(+), 236 deletions(-) diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h index 01e597c..9245d3e 100644 --- a/src/queue/lock_free_queue.h +++ b/src/queue/lock_free_queue.h @@ -12,20 +12,20 @@ #include "shm_allocator.h" #include "psem.h" #include "bus_error.h" - +#include "bus_def.h" // default Queue size #define LOCK_FREE_Q_DEFAULT_SIZE 16 // static Logger *logger = LoggerFactory::getLogger(); -// define this macro if calls to "size" must return the real size of the -// queue. If it is undefined that function will try to take a snapshot of +// define this macro if calls to "size" must return the real size of the +// queue. If it is undefined that function will try to take a snapshot of // the queue, but returned value might be bogus // forward declarations for default template values // -template <typename ELEM_T, typename Allocator> +template<typename ELEM_T, typename Allocator> class ArrayLockFreeQueue; // template <typename ELEM_T> @@ -33,9 +33,9 @@ /// @brief Lock-free queue based on a circular array -/// No allocation of extra memory for the nodes handling is needed, but it has -/// to add extra overhead (extra CAS operation) when inserting to ensure the -/// thread-safety of the queue when the queue type is not +/// No allocation of extra memory for the nodes handling is needed, but it has +/// to add extra overhead (extra CAS operation) when inserting to ensure the +/// thread-safety of the queue when the queue type is not /// ArrayLockFreeQueueSingleProducer. /// /// examples of instantiation: @@ -50,113 +50,109 @@ /// /// ELEM_T represents the type of elementes pushed and popped from the queue /// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1) -/// This number should be a power of 2 to ensure -/// indexes in the circular queue keep stable when the uint32_t +/// This number should be a power of 2 to ensure +/// indexes in the circular queue keep stable when the uint32_t /// variable that holds the current position rolls over from FFFFFFFF /// to 0. For instance -/// 2 -> 0x02 +/// 2 -> 0x02 /// 4 -> 0x04 /// 8 -> 0x08 /// 16 -> 0x10 -/// (...) +/// (...) /// 1024 -> 0x400 /// 2048 -> 0x800 /// /// if queue size is not defined as requested, let's say, for /// instance 100, when current position is FFFFFFFF (4,294,967,295) -/// index in the circular array is 4,294,967,295 % 100 = 95. -/// When that value is incremented it will be set to 0, that is the +/// index in the circular array is 4,294,967,295 % 100 = 95. +/// When that value is incremented it will be set to 0, that is the /// last 4 elements of the queue are not used when the counter rolls /// over to 0 -/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and +/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and /// ArrayLockFreeQueue are supported (single producer /// by default) -template < - typename ELEM_T, - typename Allocator = SHM_Allocator, - template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue - > -class LockFreeQueue -{ +template< + typename ELEM_T, + typename Allocator = SHM_Allocator, + template<typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue +> +class LockFreeQueue { private: - sem_t slots; - sem_t items; + sem_t slots; + sem_t items; - public: - sem_t mutex; - LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - /// @brief destructor of the class. - /// Note it is not virtual since it is not expected to inherit from this - /// template - ~LockFreeQueue(); - std::atomic_uint reference; - /// @brief constructor of the class - + sem_t mutex; - /// @brief returns the current number of items in the queue - /// It tries to take a snapshot of the size of the queue, but in busy environments - /// this function might return bogus values. - /// - /// If a reliable queue size must be kept you might want to have a look at - /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' - /// it enables a reliable size though it hits overall performance of the queue - /// (when the reliable size variable is on it's got an impact of about 20% in time) - inline uint32_t size(); - - /// @brief return true if the queue is full. False otherwise - /// It tries to take a snapshot of the size of the queue, but in busy - /// environments this function might return bogus values. See help in method - /// LockFreeQueue::size - inline bool full(); + LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - inline bool empty(); + /// @brief destructor of the class. + /// Note it is not virtual since it is not expected to inherit from this + /// template + ~LockFreeQueue(); - inline ELEM_T& operator[](unsigned i); - - /// @brief push an element at the tail of the queue - /// @param the element to insert in the queue - /// Note that the element is not a pointer or a reference, so if you are using large data - /// structures to be inserted in the queue you should think of instantiate the template - /// of the queue as a pointer to that large structure - /// @return true if the element was inserted in the queue. False if the queue was full - int push(const ELEM_T &a_data); - int push_nowait(const ELEM_T &a_data); - int push_timeout(const ELEM_T &a_data, const struct timespec * timeout); - - /// @brief pop the element at the head of the queue - /// @param a reference where the element in the head of the queue will be saved to - /// Note that the a_data parameter might contain rubbish if the function returns false - /// @return true if the element was successfully extracted from the queue. False if the queue was empty - int pop(ELEM_T &a_data); - int pop_nowait(ELEM_T &a_data); - int pop_timeout(ELEM_T &a_data, struct timespec * timeout); + std::atomic_uint reference; + /// @brief constructor of the class - void *operator new(size_t size); - void operator delete(void *p); + /// @brief returns the current number of items in the queue + /// It tries to take a snapshot of the size of the queue, but in busy environments + /// this function might return bogus values. + /// + /// If a reliable queue size must be kept you might want to have a look at + /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' + /// it enables a reliable size though it hits overall performance of the queue + /// (when the reliable size variable is on it's got an impact of about 20% in time) + inline uint32_t size(); + + /// @brief return true if the queue is full. False otherwise + /// It tries to take a snapshot of the size of the queue, but in busy + /// environments this function might return bogus values. See help in method + /// LockFreeQueue::size + inline bool full(); + + inline bool empty(); + + inline ELEM_T &operator[](unsigned i); + + /// @brief push an element at the tail of the queue + /// @param the element to insert in the queue + /// Note that the element is not a pointer or a reference, so if you are using large data + /// structures to be inserted in the queue you should think of instantiate the template + /// of the queue as a pointer to that large structure + /// @return true if the element was inserted in the queue. False if the queue was full + int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); + + /// @brief pop the element at the head of the queue + /// @param a reference where the element in the head of the queue will be saved to + /// Note that the a_data parameter might contain rubbish if the function returns false + /// @return true if the element was successfully extracted from the queue. False if the queue was empty + int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); + + + void *operator new(size_t size); + + void operator delete(void *p); protected: - /// @brief the actual queue. methods are forwarded into the real - /// implementation - Q_TYPE<ELEM_T, Allocator> m_qImpl; + /// @brief the actual queue. methods are forwarded into the real + /// implementation + Q_TYPE<ELEM_T, Allocator> m_qImpl; private: - /// @brief disable copy constructor declaring it private - LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); + /// @brief disable copy constructor declaring it private + LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); }; -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) -{ -// std::cout << "LockFreeQueue init reference=" << reference << std::endl; +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) { + // std::cout << "LockFreeQueue init reference=" << reference << std::endl; if (sem_init(&slots, 1, qsize) == -1) err_exit(errno, "LockFreeQueue sem_init"); if (sem_init(&items, 1, 0) == -1) @@ -164,194 +160,130 @@ if (sem_init(&mutex, 1, 1) == -1) err_exit(errno, "LockFreeQueue sem_init"); - + } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() -{ + +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() { // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy"); - if(sem_destroy(&slots) == -1) { + if (sem_destroy(&slots) == -1) { err_exit(errno, "LockFreeQueue sem_destroy"); } - if(sem_destroy(&items) == -1) { + if (sem_destroy(&items) == -1) { err_exit(errno, "LockFreeQueue sem_destroy"); } - if(sem_destroy(&mutex) == -1) { + if (sem_destroy(&mutex) == -1) { err_exit(errno, "LockFreeQueue sem_destroy"); } } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() -{ +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() { return m_qImpl.size(); -} +} -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() -{ +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() { return m_qImpl.full(); } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() -{ +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() { return m_qImpl.empty(); -} - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) -{ - LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); - if (psem_wait(&slots) == -1) { - return -1; - } - - if ( m_qImpl.push(a_data) ) { - psem_post(&items); -LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); - return 0; - } - return -1; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data) -{ - if (psem_trywait(&slots) == -1) { - return -1; - } - - if ( m_qImpl.push(a_data)) { - psem_post(&items); - return 0; - } - return -1; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts) -{ -LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n"); - if ( psem_timedwait(&slots, ts) == -1) { - return -1; - } - - if (m_qImpl.push(a_data)){ - psem_post(&items); -LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n"); - return 0; - } - return -1; - } - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) -{ - - LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); - if (psem_wait(&items) == -1) { - return -1; +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { + LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); + if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + if (psem_trywait(&slots) == -1) { + return -1; + } + } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + if (psem_timedwait(&slots, timeout) == -1) { + return -1; + } + } else { + if (psem_wait(&slots) == -1) { + return -1; + } } + + + if (m_qImpl.push(a_data)) { + psem_post(&items); + LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); + return 0; + } + return -1; + +} + +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) { + + LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); + + + if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + if (psem_trywait(&items) == -1) { + return -1; + } + } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + if (psem_timedwait(&items, timeout) == -1) { + return -1; + } + } else { + if (psem_wait(&items) == -1) { + return -1; + } + } + if (m_qImpl.pop(a_data)) { psem_post(&slots); - LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); + LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); return 0; } return -1; } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data) -{ - if (psem_trywait(&items) == -1) { - return -1; - } - - if (m_qImpl.pop(a_data)) { - psem_post(&slots); - return 0; - } - return -1; -} - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts) -{ - if (psem_timedwait(&items, ts) == -1) { - return -1; - } - - if (m_qImpl.pop(a_data)) { - psem_post(&slots); -// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n"); - return 0; - } - return -1; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +ELEM_T &LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { return m_qImpl.operator[](i); } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){ - return Allocator::allocate(size); +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +void *LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size) { + return Allocator::allocate(size); } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) { return Allocator::deallocate(p); } -- Gitblit v1.8.0