From f52f2c2828047c2f30d30fc1fe2b54d8db146d49 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 25 二月 2021 15:56:35 +0800 Subject: [PATCH] update --- src/queue/lock_free_queue.h | 565 +++++++++++++++++++++++++------------------------------- 1 files changed, 254 insertions(+), 311 deletions(-) diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h index 4c55f7b..54d0b04 100644 --- a/src/queue/lock_free_queue.h +++ b/src/queue/lock_free_queue.h @@ -1,28 +1,37 @@ +/** + * encapsulate array_lock_free_queue, add semphore. populate in kernal space. + */ #ifndef __LOCK_FREE_QUEUE_H__ #define __LOCK_FREE_QUEUE_H__ #include <usg_common.h> #include <assert.h> // assert() -#include "mem_pool.h" +#include "shm_mm.h" #include "sem_util.h" #include "logger_factory.h" #include "shm_allocator.h" -#include "px_sem_util.h" +#include "psem.h" #include "bus_error.h" +#include "bus_def.h" // default Queue size #define LOCK_FREE_Q_DEFAULT_SIZE 16 + +#define LOCK_FREE_Q_ST_OPENED 0 + +#define LOCK_FREE_Q_ST_CLOSED 1 + // static Logger *logger = LoggerFactory::getLogger(); -// define this macro if calls to "size" must return the real size of the -// queue. If it is undefined that function will try to take a snapshot of +// define this macro if calls to "size" must return the real size of the +// queue. If it is undefined that function will try to take a snapshot of // the queue, but returned value might be bogus // forward declarations for default template values // -template <typename ELEM_T, typename Allocator> +template<typename ELEM_T, typename Allocator> class ArrayLockFreeQueue; // template <typename ELEM_T> @@ -30,9 +39,9 @@ /// @brief Lock-free queue based on a circular array -/// No allocation of extra memory for the nodes handling is needed, but it has -/// to add extra overhead (extra CAS operation) when inserting to ensure the -/// thread-safety of the queue when the queue type is not +/// No allocation of extra memory for the nodes handling is needed, but it has +/// to add extra overhead (extra CAS operation) when inserting to ensure the +/// thread-safety of the queue when the queue type is not /// ArrayLockFreeQueueSingleProducer. /// /// examples of instantiation: @@ -47,351 +56,285 @@ /// /// ELEM_T represents the type of elementes pushed and popped from the queue /// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1) -/// This number should be a power of 2 to ensure -/// indexes in the circular queue keep stable when the uint32_t +/// This number should be a power of 2 to ensure +/// indexes in the circular queue keep stable when the uint32_t /// variable that holds the current position rolls over from FFFFFFFF /// to 0. For instance -/// 2 -> 0x02 +/// 2 -> 0x02 /// 4 -> 0x04 /// 8 -> 0x08 /// 16 -> 0x10 -/// (...) +/// (...) /// 1024 -> 0x400 /// 2048 -> 0x800 /// /// if queue size is not defined as requested, let's say, for /// instance 100, when current position is FFFFFFFF (4,294,967,295) -/// index in the circular array is 4,294,967,295 % 100 = 95. -/// When that value is incremented it will be set to 0, that is the +/// index in the circular array is 4,294,967,295 % 100 = 95. +/// When that value is incremented it will be set to 0, that is the /// last 4 elements of the queue are not used when the counter rolls /// over to 0 -/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and +/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and /// ArrayLockFreeQueue are supported (single producer /// by default) -template < - typename ELEM_T, - typename Allocator = SHM_Allocator, - template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue - > -class LockFreeQueue -{ +template< + typename ELEM_T, + typename Allocator = SHM_Allocator, + template<typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue +> +class LockFreeQueue { private: - sem_t slots; - sem_t items; + sem_t slots; + sem_t items; + time_t createTime; + time_t closeTime; + int status; - public: - sem_t mutex; - LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - /// @brief destructor of the class. - /// Note it is not virtual since it is not expected to inherit from this - /// template - ~LockFreeQueue(); - std::atomic_uint reference; - /// @brief constructor of the class - - /// @brief returns the current number of items in the queue - /// It tries to take a snapshot of the size of the queue, but in busy environments - /// this function might return bogus values. - /// - /// If a reliable queue size must be kept you might want to have a look at - /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' - /// it enables a reliable size though it hits overall performance of the queue - /// (when the reliable size variable is on it's got an impact of about 20% in time) - inline uint32_t size(); - - /// @brief return true if the queue is full. False otherwise - /// It tries to take a snapshot of the size of the queue, but in busy - /// environments this function might return bogus values. See help in method - /// LockFreeQueue::size - inline bool full(); + LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - inline bool empty(); + /// @brief destructor of the class. + /// Note it is not virtual since it is not expected to inherit from this + /// template + ~LockFreeQueue(); - inline ELEM_T& operator[](unsigned i); + inline void close(); - /// @brief push an element at the tail of the queue - /// @param the element to insert in the queue - /// Note that the element is not a pointer or a reference, so if you are using large data - /// structures to be inserted in the queue you should think of instantiate the template - /// of the queue as a pointer to that large structure - /// @return true if the element was inserted in the queue. False if the queue was full - int push(const ELEM_T &a_data); - int push_nowait(const ELEM_T &a_data); - int push_timeout(const ELEM_T &a_data, const struct timespec * timeout); - - /// @brief pop the element at the head of the queue - /// @param a reference where the element in the head of the queue will be saved to - /// Note that the a_data parameter might contain rubbish if the function returns false - /// @return true if the element was successfully extracted from the queue. False if the queue was empty - int pop(ELEM_T &a_data); - int pop_nowait(ELEM_T &a_data); - int pop_timeout(ELEM_T &a_data, struct timespec * timeout); + // std::atomic_uint reference; + /// @brief constructor of the class - void *operator new(size_t size); - void operator delete(void *p); + /// @brief returns the current number of items in the queue + /// It tries to take a snapshot of the size of the queue, but in busy environments + /// this function might return bogus values. + /// + /// If a reliable queue size must be kept you might want to have a look at + /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' + /// it enables a reliable size though it hits overall performance of the queue + /// (when the reliable size variable is on it's got an impact of about 20% in time) + inline uint32_t size(); + + /// @brief return true if the queue is full. False otherwise + /// It tries to take a snapshot of the size of the queue, but in busy + /// environments this function might return bogus values. See help in method + /// LockFreeQueue::size + inline bool full(); + + inline bool empty(); + + inline ELEM_T &operator[](unsigned i); + + + + time_t getCreateTime() { + return createTime; + } + + time_t getCloseTime() { + return closeTime; + } + + int getStatus() { + return status; + } + + /// @brief push an element at the tail of the queue + /// @param the element to insert in the queue + /// Note that the element is not a pointer or a reference, so if you are using large data + /// structures to be inserted in the queue you should think of instantiate the template + /// of the queue as a pointer to that large structure + /// @return true if the element was inserted in the queue. False if the queue was full + int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); + + /// @brief pop the element at the head of the queue + /// @param a reference where the element in the head of the queue will be saved to + /// Note that the a_data parameter might contain rubbish if the function returns false + /// @return true if the element was successfully extracted from the queue. False if the queue was empty + int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); + + + void *operator new(size_t size); + + void operator delete(void *p); protected: - /// @brief the actual queue. methods are forwarded into the real - /// implementation - Q_TYPE<ELEM_T, Allocator> m_qImpl; + /// @brief the actual queue. methods are forwarded into the real + /// implementation + Q_TYPE<ELEM_T, Allocator> m_qImpl; private: - /// @brief disable copy constructor declaring it private - LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); + /// @brief disable copy constructor declaring it private + LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); }; -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) -{ -// std::cout << "LockFreeQueue init reference=" << reference << std::endl; - if (sem_init(&slots, 1, qsize) == -1) - err_exit(errno, "LockFreeQueue sem_init"); - if (sem_init(&items, 1, 0) == -1) - err_exit(errno, "LockFreeQueue sem_init"); - if (sem_init(&mutex, 1, 1) == -1) - err_exit(errno, "LockFreeQueue sem_init"); +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) { + // std::cout << "LockFreeQueue init reference=" << reference << std::endl; + if (sem_init(&slots, 1, qsize) == -1) + err_exit(errno, "LockFreeQueue sem_init"); + if (sem_init(&items, 1, 0) == -1) + err_exit(errno, "LockFreeQueue sem_init"); + + createTime = time(NULL); + status = LOCK_FREE_Q_ST_OPENED; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() -{ - // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy"); - if(sem_destroy(&slots) == -1) { - err_exit(errno, "LockFreeQueue sem_destroy"); - } - if(sem_destroy(&items) == -1) { - err_exit(errno, "LockFreeQueue sem_destroy"); - } - if(sem_destroy(&mutex) == -1) { - err_exit(errno, "LockFreeQueue sem_destroy"); - } -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() -{ - return m_qImpl.size(); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() -{ - return m_qImpl.full(); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() -{ - return m_qImpl.empty(); -} - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) -{ -// LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); - if (sem_wait(&slots) == -1) { - err_msg(errno, "LockFreeQueue push"); - return errno; - } - - if ( m_qImpl.push(a_data) ) { - sem_post(&items); -// LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); - return 0; - } - return -1; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data) -{ - if (sem_trywait(&slots) == -1) { - if (errno == EAGAIN) - return EAGAIN; - else { - err_msg(errno, "LockFreeQueue push_nowait"); - return errno; - } - - } - - if ( m_qImpl.push(a_data)) { - sem_post(&items); - return 0; - } - return -1; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts) -{ - - int rv; - struct timespec timeout = PXSemUtil::calc_sem_timeout(ts); - // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld", - // timeout.tv_sec, timeout.tv_nsec); - - while ( sem_timedwait(&slots, &timeout) == -1) { - // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n", - // timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno); - - if(errno == ETIMEDOUT) - return EBUS_TIMEOUT; - else if(errno == EINTR) - continue; - else { - LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); - return errno; - } - } - - if (m_qImpl.push(a_data)){ - sem_post(&items); -// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n"); - return 0; - } - return -1; - } - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) -{ - - // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); - if (sem_wait(&items) == -1) { - LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop"); - return errno; - } - - if (m_qImpl.pop(a_data)) { - sem_post(&slots); -// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); - return 0; - } - return -1; -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data) -{ - if (sem_trywait(&items) == -1) { - if (errno == EAGAIN) - return errno; - else { - LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait"); - return errno; - } - } - - if (m_qImpl.pop(a_data)) { - sem_post(&slots); - return 0; - } - return -1; -} - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts) -{ -// LoggerFactory::getLogger()->debug("=================ts sec = %d, nsec = %ld \n", ts->tv_sec, ts->tv_nsec ); - - LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n"); - struct timespec timeout = PXSemUtil::calc_sem_timeout(ts); -// LoggerFactory::getLogger()->debug("================== timeout before sec = %d, nsec = %ld \n", timeout.tv_sec, timeout.tv_nsec ); - - while (sem_timedwait(&items, &timeout) == -1) { - if (errno == ETIMEDOUT) - return EBUS_TIMEOUT; - else if(errno == EINTR) - continue; - else { - LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout %d", errno); - return errno; - } - } - - if (m_qImpl.pop(a_data)) { - sem_post(&slots); -// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n"); - return 0; - } - return -1; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { - return m_qImpl.operator[](i); +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() { + status = LOCK_FREE_Q_ST_CLOSED; + closeTime = time(NULL); } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){ - return Allocator::allocate(size); +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() { + // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy"); + if (sem_destroy(&slots) == -1) { + err_exit(errno, "LockFreeQueue sem_destroy"); + } + if (sem_destroy(&items) == -1) { + err_exit(errno, "LockFreeQueue sem_destroy"); + } + } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() { + return m_qImpl.size(); +} + +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() { + return m_qImpl.full(); +} + +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() { + return m_qImpl.empty(); +} + + +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { + // LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); + // sigset_t mask_all, pre; + // sigfillset(&mask_all); + + // sigprocmask(SIG_BLOCK, &mask_all, &pre); + + if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + if (psem_trywait(&slots) == -1) { + goto LABEL_FAILTURE; + } + } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + if (psem_timedwait(&slots, timeout) == -1) { + goto LABEL_FAILTURE; + } + } else { + if (psem_wait(&slots) == -1) { + goto LABEL_FAILTURE; + } + } + + + if (m_qImpl.push(a_data)) { + psem_post(&items); + // sigprocmask(SIG_SETMASK, &pre, NULL); + // LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); + return 0; + } + + LABEL_FAILTURE: + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; +} + +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) { + // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before...."); + + // sigset_t mask_all, pre; + // sigfillset(&mask_all); + + // sigprocmask(SIG_BLOCK, &mask_all, &pre); + + if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + if (psem_trywait(&items) == -1) { + goto LABEL_FAILTURE; + } + } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before. flag=%d , %d\n", flag, timeout->tv_sec); + if (psem_timedwait(&items, timeout) == -1) { + goto LABEL_FAILTURE; + } + } else { + if (psem_wait(&items) == -1) { + goto LABEL_FAILTURE; + } + } + + if (m_qImpl.pop(a_data)) { + psem_post(&slots); + // sigprocmask(SIG_SETMASK, &pre, NULL); + // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); + return 0; + } + + + LABEL_FAILTURE: + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; +} + +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +ELEM_T &LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { + return m_qImpl.operator[](i); +} + + +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +void *LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size) { + return Allocator::allocate(size); +} + +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) { - return Allocator::deallocate(p); + LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * _que = (LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * )p; + Allocator::deallocate(p); } // include implementation files -- Gitblit v1.8.0