From 082633f08aae8eea19bd7050cbe4a75e5ed1ac6f Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期二, 07 七月 2020 12:07:29 +0800 Subject: [PATCH] update --- squeue/include/lock_free_queue.h | 265 ++++++++++++++++++++++++++++++++++++++-------------- 1 files changed, 194 insertions(+), 71 deletions(-) diff --git a/squeue/include/lock_free_queue.h b/squeue/include/lock_free_queue.h index a8f7801..c8a672c 100644 --- a/squeue/include/lock_free_queue.h +++ b/squeue/include/lock_free_queue.h @@ -4,9 +4,12 @@ #include <stdint.h> // uint32_t #include <atomic> #include <usg_common.h> +#include <assert.h> // assert() +#include "mm.h" +#include "sem_util.h" // default Queue size -#define LOCK_FREE_Q_DEFAULT_SIZE 65536 // (2^16) +#define LOCK_FREE_Q_DEFAULT_SIZE 16 // define this macro if calls to "size" must return the real size of the // queue. If it is undefined that function will try to take a snapshot of @@ -17,7 +20,7 @@ // template <typename ELEM_T> -class ArrayLockFreeQueueMultipleProducers; +class ArrayLockFreeQueue; /// @brief Lock-free queue based on a circular array @@ -32,7 +35,7 @@ /// ArrayLockFreeQueue<int, 16> q; /// // queue of ints of size (16 - 1) and /// // defaulted to single producer -/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q; +/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; /// // queue of ints of size (100 - 1) with support /// // for multiple producers /// @@ -57,13 +60,16 @@ /// last 4 elements of the queue are not used when the counter rolls /// over to 0 /// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and -/// ArrayLockFreeQueueMultipleProducers are supported (single producer +/// ArrayLockFreeQueue are supported (single producer /// by default) template < typename ELEM_T, - template <typename T> class Q_TYPE = ArrayLockFreeQueueMultipleProducers > + template <typename T> class Q_TYPE = ArrayLockFreeQueue > class LockFreeQueue { +private: + int slots; + int items; public: /// @brief constructor of the class LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); @@ -88,7 +94,7 @@ /// environments this function might return bogus values. See help in method /// LockFreeQueue::size inline bool full(); - + inline bool empty(); /// @brief push an element at the tail of the queue @@ -97,13 +103,21 @@ /// structures to be inserted in the queue you should think of instantiate the template /// of the queue as a pointer to that large structure /// @return true if the element was inserted in the queue. False if the queue was full - inline bool push(const ELEM_T &a_data); + bool push(const ELEM_T &a_data); + bool push_nowait(const ELEM_T &a_data); + bool push_timeout(const ELEM_T &a_data, struct timespec * timeout); /// @brief pop the element at the head of the queue /// @param a reference where the element in the head of the queue will be saved to /// Note that the a_data parameter might contain rubbish if the function returns false /// @return true if the element was successfully extracted from the queue. False if the queue was empty - inline bool pop(ELEM_T &a_data); + bool pop(ELEM_T &a_data); + bool pop_nowait(ELEM_T &a_data); + bool pop_timeout(ELEM_T &a_data, struct timespec * timeout); + + + void *operator new(size_t size); + void operator delete(void *p); protected: /// @brief the actual queue. methods are forwarded into the real @@ -117,77 +131,186 @@ -/// @brief implementation of an array based lock free queue with support for -/// multiple producers -/// This class is prevented from being instantiated directly (all members and -/// methods are private). To instantiate a multiple producers lock free queue -/// you must use the ArrayLockFreeQueue fachade: -/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q; -template <typename ELEM_T> -class ArrayLockFreeQueueMultipleProducers +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize): + m_qImpl(qsize) { - // ArrayLockFreeQueue will be using this' private members - template < - typename ELEM_T_, - template <typename T> class Q_TYPE > - friend class LockFreeQueue; + slots = SemUtil::get(IPC_PRIVATE, qsize); + items = SemUtil::get(IPC_PRIVATE, 0); +} -private: - /// @brief constructor of the class - ArrayLockFreeQueueMultipleProducers(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - virtual ~ArrayLockFreeQueueMultipleProducers(); - - inline uint32_t size(); - - inline bool full(); +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue() +{ + SemUtil::remove(slots); + SemUtil::remove(items); +} - inline bool empty(); - - bool push(const ELEM_T &a_data); - - bool pop(ELEM_T &a_data); - - /// @brief calculate the index in the circular array that corresponds - /// to a particular "count" value - inline uint32_t countToIndex(uint32_t a_count); - -private: - size_t Q_SIZE; - /// @brief array to keep the elements - ELEM_T *m_theQueue; +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +inline uint32_t LockFreeQueue<ELEM_T, Q_TYPE>::size() +{ + return m_qImpl.size(); +} - /// @brief where a new element will be inserted - std::atomic<uint32_t> m_writeIndex; +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Q_TYPE>::full() +{ + return m_qImpl.full(); +} - /// @brief where the next element where be extracted from - std::atomic<uint32_t> m_readIndex; - - /// @brief maximum read index for multiple producer queues - /// If it's not the same as m_writeIndex it means - /// there are writes pending to be "committed" to the queue, that means, - /// the place for the data was reserved (the index in the array) but - /// data is still not in the queue, so the thread trying to read will have - /// to wait for those other threads to save the data into the queue - /// - /// note this is only used for multiple producers - std::atomic<uint32_t> m_maximumReadIndex; - -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - /// @brief number of elements in the queue - std::atomic<uint32_t> m_count; -#endif - static int m_reference; - -private: - /// @brief disable copy constructor declaring it private - ArrayLockFreeQueueMultipleProducers<ELEM_T>(const ArrayLockFreeQueueMultipleProducers<ELEM_T> &a_src); +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Q_TYPE>::empty() +{ + return m_qImpl.empty(); +} -}; +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +bool LockFreeQueue<ELEM_T, Q_TYPE>::push(const ELEM_T &a_data) +{ + if (SemUtil::dec(slots) == -1) { + err_exit(errno, "push"); + } + + if ( m_qImpl.push(a_data) ) { + SemUtil::inc(items); + return true; + } + return false; + +} + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +bool LockFreeQueue<ELEM_T, Q_TYPE>::push_nowait(const ELEM_T &a_data) +{ + if (SemUtil::dec_nowait(slots) == -1) { + if (errno == EAGAIN) + return false; + else + err_exit(errno, "push_nowait"); + } + + if ( m_qImpl.push(a_data)) { + SemUtil::inc(items); + return true; + } + return false; + +} + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +bool LockFreeQueue<ELEM_T, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout) +{ + + if (SemUtil::dec_timeout(slots, timeout) == -1) { + if (errno == EAGAIN) + return false; + else + err_exit(errno, "push_timeout"); + } + + if (m_qImpl.push(a_data)){ + SemUtil::inc(items); + return true; + } + return false; + +} + + + + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data) +{ + if (SemUtil::dec(items) == -1) { + err_exit(errno, "remove"); + } + + if (m_qImpl.pop(a_data)) { + SemUtil::inc(slots); + return true; + } + return false; + +} + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_nowait(ELEM_T &a_data) +{ + if (SemUtil::dec_nowait(items) == -1) { + if (errno == EAGAIN) + return false; + else + err_exit(errno, "remove_nowait"); + } + + if (m_qImpl.pop(a_data)) { + SemUtil::inc(slots); + return true; + } + return false; + +} + + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) +{ + if (SemUtil::dec_timeout(items, timeout) == -1) { + if (errno == EAGAIN) + return false; + else + err_exit(errno, "remove_timeout"); + } + + if (m_qImpl.pop(a_data)) { + SemUtil::inc(slots); + return true; + } + return false; + +} + + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +void * LockFreeQueue<ELEM_T, Q_TYPE>::operator new(size_t size){ + return mm_malloc(size); +} + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +void LockFreeQueue<ELEM_T, Q_TYPE>::operator delete(void *p) { + return mm_free(p); +} // include implementation files -#include "lock_free_queue_impl.h" -#include "lock_free_queue_impl_multiple_producer.h" +#include "linked_lock_free_queue.h" +#include "array_lock_free_queue.h" #endif // _LOCK_FREE_QUEUE_H__ -- Gitblit v1.8.0