From 4fd62552d8277f3d0ed20e66663cd219c36796df Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 21 一月 2021 11:20:22 +0800 Subject: [PATCH] update --- src/queue/array_lock_free_queue2.h | 176 +++++++++++++++++++++++++++------------------------------- 1 files changed, 83 insertions(+), 93 deletions(-) diff --git a/src/queue/array_lock_free_queue2.h b/src/queue/array_lock_free_queue2.h index 3b79b7f..233bc6a 100644 --- a/src/queue/array_lock_free_queue2.h +++ b/src/queue/array_lock_free_queue2.h @@ -1,9 +1,11 @@ #ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ #define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ - +#include "atomic_ops.h" #include <assert.h> // assert() #include <sched.h> // sched_yield() #include "logger_factory.h" +#include "mem_pool.h" +#include "shm_allocator.h" /// @brief implementation of an array based lock free queue with support for /// multiple producers @@ -15,13 +17,15 @@ #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -template <typename ELEM_T> +template <typename ELEM_T, typename Allocator = SHM_Allocator> class ArrayLockFreeQueue { // ArrayLockFreeQueue will be using this' private members template < typename ELEM_T_, - template <typename T> class Q_TYPE > + typename Allocator_, + template <typename T, typename AT> class Q_TYPE + > friend class LockFreeQueue; private: @@ -52,10 +56,10 @@ ELEM_T *m_theQueue; /// @brief where a new element will be inserted - std::atomic<uint32_t> m_writeIndex; + uint32_t m_writeIndex; /// @brief where the next element where be extracted from - std::atomic<uint32_t> m_readIndex; + uint32_t m_readIndex; /// @brief maximum read index for multiple producer queues /// If it's not the same as m_writeIndex it means @@ -65,23 +69,23 @@ /// to wait for those other threads to save the data into the queue /// /// note this is only used for multiple producers - std::atomic<uint32_t> m_maximumReadIndex; + uint32_t m_maximumReadIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE /// @brief number of elements in the queue - std::atomic<uint32_t> m_count; + uint32_t m_count; #endif private: /// @brief disable copy constructor declaring it private - ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src); + ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); }; -template <typename ELEM_T> -ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize): +template <typename ELEM_T, typename Allocator> +ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize): Q_SIZE(qsize), m_writeIndex(0), // initialisation is not atomic m_readIndex(0), // @@ -90,38 +94,38 @@ ,m_count(0) // #endif { - m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T)); + m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); } -template <typename ELEM_T> -ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue() +template <typename ELEM_T, typename Allocator> +ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() { // std::cout << "destroy ArrayLockFreeQueue\n"; - mm_free(m_theQueue); + Allocator::deallocate(m_theQueue); } -template <typename ELEM_T> +template <typename ELEM_T, typename Allocator> inline -uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count) +uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) { // if Q_SIZE is a power of 2 this statement could be also written as // return (a_count & (Q_SIZE - 1)); return (a_count % Q_SIZE); } -template <typename ELEM_T> +template <typename ELEM_T, typename Allocator> inline -uint32_t ArrayLockFreeQueue<ELEM_T>::size() +uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - return m_count.load(); + return m_count; #else - uint32_t currentWriteIndex = m_maximumReadIndex.load(); - uint32_t currentReadIndex = m_readIndex.load(); + uint32_t currentWriteIndex = m_maximumReadIndex; + uint32_t currentReadIndex = m_readIndex; // let's think of a scenario where this function returns bogus data // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run @@ -146,13 +150,13 @@ #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE } -template <typename ELEM_T> +template <typename ELEM_T, typename Allocator> inline -bool ArrayLockFreeQueue<ELEM_T>::full() +bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - return (m_count.load() == (Q_SIZE)); + return (m_count == (Q_SIZE)); #else uint32_t currentWriteIndex = m_writeIndex; @@ -171,16 +175,16 @@ #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE } -template <typename ELEM_T> +template <typename ELEM_T, typename Allocator> inline -bool ArrayLockFreeQueue<ELEM_T>::empty() +bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - return (m_count.load() == 0); + return (m_count == 0); #else - if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load())) + if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) { // the queue is full return true; @@ -194,54 +198,44 @@ } -template <typename ELEM_T> -bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data) + + + + +template <typename ELEM_T, typename Allocator> +bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) { uint32_t currentReadIndex; uint32_t currentWriteIndex; - + do { - currentWriteIndex = m_writeIndex.load(); - currentReadIndex = m_readIndex.load(); -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - if (m_count.load() == Q_SIZE) { + currentWriteIndex = m_writeIndex; + currentReadIndex = m_readIndex; + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + if (m_count == Q_SIZE) { return false; } -#else + #else if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) { // the queue is full return false; } -#endif - - // There is more than one producer. Keep looping till this thread is able - // to allocate space for current piece of data - // - // using compare_exchange_strong because it isn't allowed to fail spuriously - // When the compare_exchange operation is in a loop the weak version - // will yield better performance on some platforms, but here we'd have to - // load m_writeIndex all over again - } while (!m_writeIndex.compare_exchange_strong( - currentWriteIndex, (currentWriteIndex + 1))); - - // Just made sure this index is reserved for this thread. + #endif + + } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); + + // We know now that this index is reserved for us. Use it to save the data m_theQueue[countToIndex(currentWriteIndex)] = a_data; - //memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) ); - - // update the maximum read index after saving the piece of data. It can't - // fail if there is only one thread inserting in the queue. It might fail - // if there is more than 1 producer thread because this operation has to - // be done in the same order as the previous CAS - // - // using compare_exchange_weak because they are allowed to fail spuriously - // (act as if *this != expected, even if they are equal), but when the - // compare_exchange operation is in a loop the weak version will yield - // better performance on some platforms. - while (!m_maximumReadIndex.compare_exchange_weak( - currentWriteIndex, (currentWriteIndex + 1))) + + // update the maximum read index after saving the data. It wouldn't fail if there is only one thread + // inserting in the queue. It might fail if there are more than 1 producer threads because this + // operation has to be done in the same order as the previous CAS + + while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { // this is a good place to yield the thread in case there are more // software threads than hardware processors and you have more @@ -250,37 +244,35 @@ sched_yield(); } - // The value was successfully inserted into the queue #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - m_count.fetch_add(1); + AtomicAdd(&m_count, 1); #endif - return true; } -template <typename ELEM_T> -bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data) + +template <typename ELEM_T, typename Allocator> +bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) { uint32_t currentMaximumReadIndex; uint32_t currentReadIndex; do { - currentReadIndex = m_readIndex.load(); - currentMaximumReadIndex = m_maximumReadIndex.load(); + // to ensure thread-safety when there is more than 1 producer thread + // a second index is defined (m_maximumReadIndex) + currentReadIndex = m_readIndex; + currentMaximumReadIndex = m_maximumReadIndex; + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count.load() == 0) { + if (m_count == 0) { return false; } #else - // to ensure thread-safety when there is more than 1 producer - // thread a second index is defined (m_maximumReadIndex) if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) { // the queue is empty or - // a producer thread has allocate space in the queue but is + // a producer thread has allocate space in the queue but is // waiting to commit the data into it return false; } @@ -288,23 +280,22 @@ // retrieve the data from the queue a_data = m_theQueue[countToIndex(currentReadIndex)]; - //memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) ); + // try to perfrom now the CAS operation on the read index. If we succeed - // a_data already contains what m_readIndex pointed to before we + // a_data already contains what m_readIndex pointed to before we // increased it - if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1))) + if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { - // got here. The value was retrieved from the queue. Note that the - // data inside the m_queue array is not deleted nor reseted -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - m_count.fetch_sub(1); -#endif + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + // m_count.fetch_sub(1); + AtomicSub(&m_count, 1); + #endif return true; } - + // it failed retrieving the element off the queue. Someone else must // have read the element stored at countToIndex(currentReadIndex) - // before we could perform the CAS operation + // before we could perform the CAS operation } while(1); // keep looping to try again! @@ -312,18 +303,17 @@ assert(0); // Add this return statement to avoid compiler warnings - return false; + return false; } - -template <typename ELEM_T> -ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i) +template <typename ELEM_T, typename Allocator> +ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) { - int currentCount = m_count.load(); - uint32_t currentReadIndex = m_readIndex.load(); - if (i < 0 || i >= currentCount) + int currentCount = m_count; + uint32_t currentReadIndex = m_readIndex; + if (i >= currentCount) { - std::cerr << "Error in array limits: " << i << " is out of range\n"; + std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; std::exit(EXIT_FAILURE); } return m_theQueue[countToIndex(currentReadIndex+i)]; -- Gitblit v1.8.0