From 68d23225a38a35f1325eb39fa4ed5a005d5de473 Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@aiot.com> Date: 星期三, 11 八月 2021 09:50:20 +0800 Subject: [PATCH] fix from 3.1 first commit --- src/queue/lock_free_queue.h | 525 +++++++++++++++++++++++++++------------------------------ 1 files changed, 250 insertions(+), 275 deletions(-) diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h index 281b7e5..54d0b04 100644 --- a/src/queue/lock_free_queue.h +++ b/src/queue/lock_free_queue.h @@ -1,25 +1,37 @@ +/** + * encapsulate array_lock_free_queue, add semphore. populate in kernal space. + */ #ifndef __LOCK_FREE_QUEUE_H__ #define __LOCK_FREE_QUEUE_H__ #include <usg_common.h> #include <assert.h> // assert() -#include "mem_pool.h" +#include "shm_mm.h" #include "sem_util.h" #include "logger_factory.h" #include "shm_allocator.h" +#include "psem.h" +#include "bus_error.h" +#include "bus_def.h" // default Queue size #define LOCK_FREE_Q_DEFAULT_SIZE 16 -// define this macro if calls to "size" must return the real size of the -// queue. If it is undefined that function will try to take a snapshot of + +#define LOCK_FREE_Q_ST_OPENED 0 + +#define LOCK_FREE_Q_ST_CLOSED 1 + +// static Logger *logger = LoggerFactory::getLogger(); +// define this macro if calls to "size" must return the real size of the +// queue. If it is undefined that function will try to take a snapshot of // the queue, but returned value might be bogus // forward declarations for default template values // -template <typename ELEM_T, typename Allocator> +template<typename ELEM_T, typename Allocator> class ArrayLockFreeQueue; // template <typename ELEM_T> @@ -27,9 +39,9 @@ /// @brief Lock-free queue based on a circular array -/// No allocation of extra memory for the nodes handling is needed, but it has -/// to add extra overhead (extra CAS operation) when inserting to ensure the -/// thread-safety of the queue when the queue type is not +/// No allocation of extra memory for the nodes handling is needed, but it has +/// to add extra overhead (extra CAS operation) when inserting to ensure the +/// thread-safety of the queue when the queue type is not /// ArrayLockFreeQueueSingleProducer. /// /// examples of instantiation: @@ -44,322 +56,285 @@ /// /// ELEM_T represents the type of elementes pushed and popped from the queue /// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1) -/// This number should be a power of 2 to ensure -/// indexes in the circular queue keep stable when the uint32_t +/// This number should be a power of 2 to ensure +/// indexes in the circular queue keep stable when the uint32_t /// variable that holds the current position rolls over from FFFFFFFF /// to 0. For instance -/// 2 -> 0x02 +/// 2 -> 0x02 /// 4 -> 0x04 /// 8 -> 0x08 /// 16 -> 0x10 -/// (...) +/// (...) /// 1024 -> 0x400 /// 2048 -> 0x800 /// /// if queue size is not defined as requested, let's say, for /// instance 100, when current position is FFFFFFFF (4,294,967,295) -/// index in the circular array is 4,294,967,295 % 100 = 95. -/// When that value is incremented it will be set to 0, that is the +/// index in the circular array is 4,294,967,295 % 100 = 95. +/// When that value is incremented it will be set to 0, that is the /// last 4 elements of the queue are not used when the counter rolls /// over to 0 -/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and +/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and /// ArrayLockFreeQueue are supported (single producer /// by default) -template < - typename ELEM_T, - typename Allocator = SHM_Allocator, - template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue - > -class LockFreeQueue -{ +template< + typename ELEM_T, + typename Allocator = SHM_Allocator, + template<typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue +> +class LockFreeQueue { private: - int slots; - int items; - + sem_t slots; + sem_t items; + + time_t createTime; + time_t closeTime; + int status; + public: - int mutex; - LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - /// @brief destructor of the class. - /// Note it is not virtual since it is not expected to inherit from this - /// template - ~LockFreeQueue(); - std::atomic_uint reference; - /// @brief constructor of the class - - /// @brief returns the current number of items in the queue - /// It tries to take a snapshot of the size of the queue, but in busy environments - /// this function might return bogus values. - /// - /// If a reliable queue size must be kept you might want to have a look at - /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' - /// it enables a reliable size though it hits overall performance of the queue - /// (when the reliable size variable is on it's got an impact of about 20% in time) - inline uint32_t size(); - - /// @brief return true if the queue is full. False otherwise - /// It tries to take a snapshot of the size of the queue, but in busy - /// environments this function might return bogus values. See help in method - /// LockFreeQueue::size - inline bool full(); + LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - inline bool empty(); + /// @brief destructor of the class. + /// Note it is not virtual since it is not expected to inherit from this + /// template + ~LockFreeQueue(); - inline ELEM_T& operator[](unsigned i); + inline void close(); - /// @brief push an element at the tail of the queue - /// @param the element to insert in the queue - /// Note that the element is not a pointer or a reference, so if you are using large data - /// structures to be inserted in the queue you should think of instantiate the template - /// of the queue as a pointer to that large structure - /// @return true if the element was inserted in the queue. False if the queue was full - bool push(const ELEM_T &a_data); - bool push_nowait(const ELEM_T &a_data); - bool push_timeout(const ELEM_T &a_data, const struct timespec * timeout); - - /// @brief pop the element at the head of the queue - /// @param a reference where the element in the head of the queue will be saved to - /// Note that the a_data parameter might contain rubbish if the function returns false - /// @return true if the element was successfully extracted from the queue. False if the queue was empty - bool pop(ELEM_T &a_data); - bool pop_nowait(ELEM_T &a_data); - bool pop_timeout(ELEM_T &a_data, struct timespec * timeout); + // std::atomic_uint reference; + /// @brief constructor of the class - void *operator new(size_t size); - void operator delete(void *p); + /// @brief returns the current number of items in the queue + /// It tries to take a snapshot of the size of the queue, but in busy environments + /// this function might return bogus values. + /// + /// If a reliable queue size must be kept you might want to have a look at + /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' + /// it enables a reliable size though it hits overall performance of the queue + /// (when the reliable size variable is on it's got an impact of about 20% in time) + inline uint32_t size(); + + /// @brief return true if the queue is full. False otherwise + /// It tries to take a snapshot of the size of the queue, but in busy + /// environments this function might return bogus values. See help in method + /// LockFreeQueue::size + inline bool full(); + + inline bool empty(); + + inline ELEM_T &operator[](unsigned i); + + + + time_t getCreateTime() { + return createTime; + } + + time_t getCloseTime() { + return closeTime; + } + + int getStatus() { + return status; + } + + /// @brief push an element at the tail of the queue + /// @param the element to insert in the queue + /// Note that the element is not a pointer or a reference, so if you are using large data + /// structures to be inserted in the queue you should think of instantiate the template + /// of the queue as a pointer to that large structure + /// @return true if the element was inserted in the queue. False if the queue was full + int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); + + /// @brief pop the element at the head of the queue + /// @param a reference where the element in the head of the queue will be saved to + /// Note that the a_data parameter might contain rubbish if the function returns false + /// @return true if the element was successfully extracted from the queue. False if the queue was empty + int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); + + + void *operator new(size_t size); + + void operator delete(void *p); protected: - /// @brief the actual queue. methods are forwarded into the real - /// implementation - Q_TYPE<ELEM_T, Allocator> m_qImpl; + /// @brief the actual queue. methods are forwarded into the real + /// implementation + Q_TYPE<ELEM_T, Allocator> m_qImpl; private: - /// @brief disable copy constructor declaring it private - LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); + /// @brief disable copy constructor declaring it private + LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); }; -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) -{ -// std::cout << "LockFreeQueue init reference=" << reference << std::endl; - slots = SemUtil::get(IPC_PRIVATE, qsize); - items = SemUtil::get(IPC_PRIVATE, 0); - mutex = SemUtil::get(IPC_PRIVATE, 1); -} +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) { + // std::cout << "LockFreeQueue init reference=" << reference << std::endl; + if (sem_init(&slots, 1, qsize) == -1) + err_exit(errno, "LockFreeQueue sem_init"); + if (sem_init(&items, 1, 0) == -1) + err_exit(errno, "LockFreeQueue sem_init"); + + createTime = time(NULL); + status = LOCK_FREE_Q_ST_OPENED; -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() -{ - LoggerFactory::getLogger()->debug("LockFreeQueue desctroy"); - SemUtil::remove(slots); - SemUtil::remove(items); - SemUtil::remove(mutex); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() -{ - return m_qImpl.size(); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() -{ - return m_qImpl.full(); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() -{ - return m_qImpl.empty(); -} - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) -{ - // printf("==================LockFreeQueue push before\n"); - if (SemUtil::dec(slots) == -1) { - err_msg(errno, "LockFreeQueue push"); - return false; - } - - if ( m_qImpl.push(a_data) ) { - - SemUtil::inc(items); - // printf("==================LockFreeQueue push after\n"); - return true; - } - return false; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data) -{ - if (SemUtil::dec_nowait(slots) == -1) { - if (errno == EAGAIN) - return false; - else { - err_msg(errno, "LockFreeQueue push_nowait"); - return false; - } - - } - - if ( m_qImpl.push(a_data)) { - SemUtil::inc(items); - return true; - } - return false; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout) -{ - - - if (SemUtil::dec_timeout(slots, timeout) == -1) { - if (errno == EAGAIN) - return false; - else { - // err_msg(errno, "LockFreeQueue push_timeout"); - return false; - } - } - - if (m_qImpl.push(a_data)){ - SemUtil::inc(items); - return true; - } - return false; - } - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) -{ - // printf("==================LockFreeQueue pop before\n"); - if (SemUtil::dec(items) == -1) { - err_msg(errno, "LockFreeQueue pop"); - return false; - } - - if (m_qImpl.pop(a_data)) { - SemUtil::inc(slots); - // printf("==================LockFreeQueue pop after\n"); - return true; - } - return false; - +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() { + status = LOCK_FREE_Q_ST_CLOSED; + closeTime = time(NULL); } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data) -{ - if (SemUtil::dec_nowait(items) == -1) { - if (errno == EAGAIN) - return false; - else { - err_msg(errno, "LockFreeQueue pop_nowait"); - return false; - } - } - if (m_qImpl.pop(a_data)) { - SemUtil::inc(slots); - return true; - } - return false; - +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() { + // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy"); + if (sem_destroy(&slots) == -1) { + err_exit(errno, "LockFreeQueue sem_destroy"); + } + if (sem_destroy(&items) == -1) { + err_exit(errno, "LockFreeQueue sem_destroy"); + } + } - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) -{ -// printf("==================LockFreeQueue pop_timeout before\n"); - if (SemUtil::dec_timeout(items, timeout) == -1) { - if (errno == EAGAIN) - return false; - else { - // err_msg(errno, "LockFreeQueue pop_timeout"); - return false; - } +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() { + return m_qImpl.size(); +} + +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() { + return m_qImpl.full(); +} + +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() { + return m_qImpl.empty(); +} + + +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { + // LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); + // sigset_t mask_all, pre; + // sigfillset(&mask_all); + + // sigprocmask(SIG_BLOCK, &mask_all, &pre); + + if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + if (psem_trywait(&slots) == -1) { + goto LABEL_FAILTURE; } - - if (m_qImpl.pop(a_data)) { - SemUtil::inc(slots); -// printf("==================LockFreeQueue pop_timeout after\n"); - return true; + } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + if (psem_timedwait(&slots, timeout) == -1) { + goto LABEL_FAILTURE; } - return false; - + } else { + if (psem_wait(&slots) == -1) { + goto LABEL_FAILTURE; + } + } + + + if (m_qImpl.push(a_data)) { + psem_post(&items); + // sigprocmask(SIG_SETMASK, &pre, NULL); + // LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); + return 0; + } + + LABEL_FAILTURE: + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { - return m_qImpl.operator[](i); +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) { + // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before...."); + + // sigset_t mask_all, pre; + // sigfillset(&mask_all); + + // sigprocmask(SIG_BLOCK, &mask_all, &pre); + + if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + if (psem_trywait(&items) == -1) { + goto LABEL_FAILTURE; + } + } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before. flag=%d , %d\n", flag, timeout->tv_sec); + if (psem_timedwait(&items, timeout) == -1) { + goto LABEL_FAILTURE; + } + } else { + if (psem_wait(&items) == -1) { + goto LABEL_FAILTURE; + } + } + + if (m_qImpl.pop(a_data)) { + psem_post(&slots); + // sigprocmask(SIG_SETMASK, &pre, NULL); + // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); + return 0; + } + + + LABEL_FAILTURE: + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){ - return Allocator::allocate(size); +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +ELEM_T &LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { + return m_qImpl.operator[](i); } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> + +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +void *LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size) { + return Allocator::allocate(size); +} + +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) { - return Allocator::deallocate(p); + LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * _que = (LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * )p; + Allocator::deallocate(p); } // include implementation files -- Gitblit v1.8.0