From 68d23225a38a35f1325eb39fa4ed5a005d5de473 Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@aiot.com> Date: 星期三, 11 八月 2021 09:50:20 +0800 Subject: [PATCH] fix from 3.1 first commit --- src/queue/array_lock_free_queue.h | 489 ++++++++++++++++++++++++++---------------------------- 1 files changed, 237 insertions(+), 252 deletions(-) diff --git a/src/queue/array_lock_free_queue.h b/src/queue/array_lock_free_queue.h index ae1506d..5ff8daf 100644 --- a/src/queue/array_lock_free_queue.h +++ b/src/queue/array_lock_free_queue.h @@ -1,10 +1,11 @@ #ifndef __ARRAY_LOCK_FREE_QUEUE_H__ #define __ARRAY_LOCK_FREE_QUEUE_H__ + #include "atomic_ops.h" #include <assert.h> // assert() #include <sched.h> // sched_yield() #include "logger_factory.h" -#include "mem_pool.h" +#include "shm_mm.h" #include "shm_allocator.h" /// @brief implementation of an array based lock free queue with support for @@ -17,306 +18,290 @@ #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -template <typename ELEM_T, typename Allocator = SHM_Allocator> -class ArrayLockFreeQueue -{ - // ArrayLockFreeQueue will be using this' private members - template < - typename ELEM_T_, - typename Allocator_, - template <typename T, typename AT> class Q_TYPE - > - friend class LockFreeQueue; +template<typename ELEM_T, typename Allocator = SHM_Allocator> +class ArrayLockFreeQueue { + // ArrayLockFreeQueue will be using this' private members + template< + typename ELEM_T_, + typename Allocator_, + template<typename T, typename AT> class Q_TYPE + > + friend + class LockFreeQueue; private: - /// @brief constructor of the class - ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - virtual ~ArrayLockFreeQueue(); - - inline uint32_t size(); - - inline bool full(); + /// @brief constructor of the class + ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - inline bool empty(); - - bool push(const ELEM_T &a_data); - - bool pop(ELEM_T &a_data); - - /// @brief calculate the index in the circular array that corresponds - /// to a particular "count" value - inline uint32_t countToIndex(uint32_t a_count); + virtual ~ArrayLockFreeQueue(); - ELEM_T& operator[](unsigned i); - -private: - size_t Q_SIZE; - /// @brief array to keep the elements - ELEM_T *m_theQueue; + inline uint32_t size(); - /// @brief where a new element will be inserted - uint32_t m_writeIndex; + inline bool full(); - /// @brief where the next element where be extracted from - uint32_t m_readIndex; - - /// @brief maximum read index for multiple producer queues - /// If it's not the same as m_writeIndex it means - /// there are writes pending to be "committed" to the queue, that means, - /// the place for the data was reserved (the index in the array) but - /// data is still not in the queue, so the thread trying to read will have - /// to wait for those other threads to save the data into the queue - /// - /// note this is only used for multiple producers - uint32_t m_maximumReadIndex; + inline bool empty(); + + bool push(const ELEM_T &a_data); + + bool pop(ELEM_T &a_data); + + /// @brief calculate the index in the circular array that corresponds + /// to a particular "count" value + inline uint32_t countToIndex(uint32_t a_count); + + ELEM_T &operator[](unsigned i); + +private: + size_t Q_SIZE; + /// @brief array to keep the elements + ELEM_T *m_theQueue; + + /// @brief where a new element will be inserted + uint32_t m_writeIndex; + + /// @brief where the next element where be extracted from + uint32_t m_readIndex; + + /// @brief maximum read index for multiple producer queues + /// If it's not the same as m_writeIndex it means + /// there are writes pending to be "committed" to the queue, that means, + /// the place for the data was reserved (the index in the array) but + /// data is still not in the queue, so the thread trying to read will have + /// to wait for those other threads to save the data into the queue + /// + /// note this is only used for multiple producers + uint32_t m_maximumReadIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - /// @brief number of elements in the queue - uint32_t m_count; + /// @brief number of elements in the queue + uint32_t m_count; #endif - - + + private: - /// @brief disable copy constructor declaring it private - ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); + /// @brief disable copy constructor declaring it private + ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); }; -template <typename ELEM_T, typename Allocator> +template<typename ELEM_T, typename Allocator> ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize): - Q_SIZE(qsize), - m_writeIndex(0), // initialisation is not atomic - m_readIndex(0), // - m_maximumReadIndex(0) // + Q_SIZE(qsize), + m_writeIndex(0), // initialisation is not atomic + m_readIndex(0), // + m_maximumReadIndex(0) // #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - ,m_count(0) // + , m_count(0) // #endif { - m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); + m_theQueue = (ELEM_T *) Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); } -template <typename ELEM_T, typename Allocator> -ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() -{ - // std::cout << "destroy ArrayLockFreeQueue\n"; - Allocator::deallocate(m_theQueue); - +template<typename ELEM_T, typename Allocator> +ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() { + // std::cout << "destroy ArrayLockFreeQueue\n"; + Allocator::deallocate(m_theQueue); + } -template <typename ELEM_T, typename Allocator> -inline -uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) -{ - // if Q_SIZE is a power of 2 this statement could be also written as - // return (a_count & (Q_SIZE - 1)); - return (a_count % Q_SIZE); +template<typename ELEM_T, typename Allocator> +inline +uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) { + // if Q_SIZE is a power of 2 this statement could be also written as + // return (a_count & (Q_SIZE - 1)); + return (a_count % Q_SIZE); } -template <typename ELEM_T, typename Allocator> -inline -uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() -{ +template<typename ELEM_T, typename Allocator> +inline +uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - return m_count; + return m_count; #else - uint32_t currentWriteIndex = m_maximumReadIndex; - uint32_t currentReadIndex = m_readIndex; + uint32_t currentWriteIndex = m_maximumReadIndex; + uint32_t currentReadIndex = m_readIndex; - // let's think of a scenario where this function returns bogus data - // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run - // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 - // 2. afterwards this thread is preemted. While this thread is inactive 2 - // elements are inserted and removed from the queue, so m_maximumReadIndex - // is 5 and m_readIndex 4. Real size is still 1 - // 3. Now the current thread comes back from preemption and reads m_readIndex. - // currentReadIndex is 4 - // 4. currentReadIndex is bigger than currentWriteIndex, so - // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, - // it returns that the queue is almost full, when it is almost empty - // - if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) - { - return (currentWriteIndex - currentReadIndex); - } - else - { - return (Q_SIZE + currentWriteIndex - currentReadIndex); - } + // let's think of a scenario where this function returns bogus data + // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run + // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 + // 2. afterwards this thread is preemted. While this thread is inactive 2 + // elements are inserted and removed from the queue, so m_maximumReadIndex + // is 5 and m_readIndex 4. Real size is still 1 + // 3. Now the current thread comes back from preemption and reads m_readIndex. + // currentReadIndex is 4 + // 4. currentReadIndex is bigger than currentWriteIndex, so + // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, + // it returns that the queue is almost full, when it is almost empty + // + if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) + { + return (currentWriteIndex - currentReadIndex); + } + else + { + return (Q_SIZE + currentWriteIndex - currentReadIndex); + } #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE } -template <typename ELEM_T, typename Allocator> -inline -bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() -{ +template<typename ELEM_T, typename Allocator> +inline +bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - return (m_count == (Q_SIZE)); + return (m_count == (Q_SIZE)); #else - uint32_t currentWriteIndex = m_writeIndex; - uint32_t currentReadIndex = m_readIndex; - + uint32_t currentWriteIndex = m_writeIndex; + uint32_t currentReadIndex = m_readIndex; + + if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) + { + // the queue is full + return true; + } + else + { + // not full! + return false; + } +#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE +} + +template<typename ELEM_T, typename Allocator> +inline +bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() { +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + return (m_count == 0); +#else + + if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) + { + // the queue is full + return true; + } + else + { + // not full! + return false; + } +#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE +} + + +template<typename ELEM_T, typename Allocator> +bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) { + uint32_t currentReadIndex; + uint32_t currentWriteIndex; + + do { + + currentWriteIndex = m_writeIndex; + currentReadIndex = m_readIndex; +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + if (m_count == Q_SIZE) { + return false; + } +#else if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) { // the queue is full - return true; - } - else - { - // not full! return false; } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - -template <typename ELEM_T, typename Allocator> -inline -bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return (m_count == 0); -#else - - if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) - { - // the queue is full - return true; - } - else - { - // not full! - return false; - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - - - - - - -template <typename ELEM_T, typename Allocator> -bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) -{ - uint32_t currentReadIndex; - uint32_t currentWriteIndex; - - do - { - - currentWriteIndex = m_writeIndex; - currentReadIndex = m_readIndex; - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count == Q_SIZE) { - return false; - } - #else - if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) - { - // the queue is full - return false; - } - #endif - - } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); - - // We know now that this index is reserved for us. Use it to save the data - m_theQueue[countToIndex(currentWriteIndex)] = a_data; - - // update the maximum read index after saving the data. It wouldn't fail if there is only one thread - // inserting in the queue. It might fail if there are more than 1 producer threads because this - // operation has to be done in the same order as the previous CAS - - while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) - { - // this is a good place to yield the thread in case there are more - // software threads than hardware processors and you have more - // than 1 producer thread - // have a look at sched_yield (POSIX.1b) - sched_yield(); - } - -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - AtomicAdd(&m_count, 1); #endif - return true; + + } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); + + // We know now that this index is reserved for us. Use it to save the data + m_theQueue[countToIndex(currentWriteIndex)] = a_data; + + // update the maximum read index after saving the data. It wouldn't fail if there is only one thread + // inserting in the queue. It might fail if there are more than 1 producer threads because this + // operation has to be done in the same order as the previous CAS + + while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { + // this is a good place to yield the thread in case there are more + // software threads than hardware processors and you have more + // than 1 producer thread + // have a look at sched_yield (POSIX.1b) + sched_yield(); + } + +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + AtomicAdd(&m_count, 1); +#endif + return true; } -template <typename ELEM_T, typename Allocator> -bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) -{ - uint32_t currentMaximumReadIndex; - uint32_t currentReadIndex; +template<typename ELEM_T, typename Allocator> +bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) { + uint32_t currentMaximumReadIndex; + uint32_t currentReadIndex; - do - { - // to ensure thread-safety when there is more than 1 producer thread - // a second index is defined (m_maximumReadIndex) - currentReadIndex = m_readIndex; - currentMaximumReadIndex = m_maximumReadIndex; - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + do { + // to ensure thread-safety when there is more than 1 producer thread + // a second index is defined (m_maximumReadIndex) + currentReadIndex = m_readIndex; + currentMaximumReadIndex = m_maximumReadIndex; +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - if (m_count == 0) { - return false; - } - #else - if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) - { - // the queue is empty or - // a producer thread has allocate space in the queue but is - // waiting to commit the data into it - return false; - } - #endif - - // retrieve the data from the queue - a_data = m_theQueue[countToIndex(currentReadIndex)]; - - // try to perfrom now the CAS operation on the read index. If we succeed - // a_data already contains what m_readIndex pointed to before we - // increased it - if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) - { - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - // m_count.fetch_sub(1); - AtomicSub(&m_count, 1); - #endif - return true; - } - - // it failed retrieving the element off the queue. Someone else must - // have read the element stored at countToIndex(currentReadIndex) - // before we could perform the CAS operation - - } while(1); // keep looping to try again! - - // Something went wrong. it shouldn't be possible to reach here - assert(0); - - // Add this return statement to avoid compiler warnings - return false; -} - -template <typename ELEM_T, typename Allocator> -ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) -{ - int currentCount = m_count; - uint32_t currentReadIndex = m_readIndex; - if (i >= currentCount) - { - std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); + if (m_count == 0) { + return false; } - return m_theQueue[countToIndex(currentReadIndex+i)]; +#else + if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) + { + // the queue is empty or + // a producer thread has allocate space in the queue but is + // waiting to commit the data into it + return false; + } +#endif + + // retrieve the data from the queue + a_data = m_theQueue[countToIndex(currentReadIndex)]; + + // try to perfrom now the CAS operation on the read index. If we succeed + // a_data already contains what m_readIndex pointed to before we + // increased it + if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + // m_count.fetch_sub(1); + AtomicSub(&m_count, 1); +#endif + return true; + } + + // it failed retrieving the element off the queue. Someone else must + // have read the element stored at countToIndex(currentReadIndex) + // before we could perform the CAS operation + + } while (1); // keep looping to try again! + + // Something went wrong. it shouldn't be possible to reach here + assert(0); + + // Add this return statement to avoid compiler warnings + return false; +} + +template<typename ELEM_T, typename Allocator> +ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) { + int currentCount = m_count; + uint32_t currentReadIndex = m_readIndex; + if (i >= currentCount) { + std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i + << " is out of range\n"; + std::exit(EXIT_FAILURE); + } + return m_theQueue[countToIndex(currentReadIndex + i)]; } #endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ -- Gitblit v1.8.0