From cc2b5d755f6a5bb515188f6f2db009ec674f83a2 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 09 七月 2020 20:53:45 +0800 Subject: [PATCH] commit --- queue/include/array_lock_free_queue.h | 128 +++++++++++++++++++----------------------- 1 files changed, 59 insertions(+), 69 deletions(-) diff --git a/queue/include/array_lock_free_queue.h b/queue/include/array_lock_free_queue.h index 05af513..90c3bf1 100644 --- a/queue/include/array_lock_free_queue.h +++ b/queue/include/array_lock_free_queue.h @@ -1,6 +1,6 @@ #ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ #define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ - +#include "atomic_ops.h" #include <assert.h> // assert() #include <sched.h> // sched_yield() #include "logger_factory.h" @@ -52,10 +52,10 @@ ELEM_T *m_theQueue; /// @brief where a new element will be inserted - std::atomic<uint32_t> m_writeIndex; + uint32_t m_writeIndex; /// @brief where the next element where be extracted from - std::atomic<uint32_t> m_readIndex; + uint32_t m_readIndex; /// @brief maximum read index for multiple producer queues /// If it's not the same as m_writeIndex it means @@ -65,11 +65,11 @@ /// to wait for those other threads to save the data into the queue /// /// note this is only used for multiple producers - std::atomic<uint32_t> m_maximumReadIndex; + uint32_t m_maximumReadIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE /// @brief number of elements in the queue - std::atomic<uint32_t> m_count; + uint32_t m_count; #endif @@ -117,11 +117,11 @@ { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - return m_count.load(); + return m_count; #else - uint32_t currentWriteIndex = m_maximumReadIndex.load(); - uint32_t currentReadIndex = m_readIndex.load(); + uint32_t currentWriteIndex = m_maximumReadIndex; + uint32_t currentReadIndex = m_readIndex; // let's think of a scenario where this function returns bogus data // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run @@ -152,7 +152,7 @@ { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - return (m_count.load() == (Q_SIZE)); + return (m_count == (Q_SIZE)); #else uint32_t currentWriteIndex = m_writeIndex; @@ -177,10 +177,10 @@ { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - return (m_count.load() == 0); + return (m_count == 0); #else - if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load())) + if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) { // the queue is full return true; @@ -194,52 +194,44 @@ } + + + + template <typename ELEM_T> bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data) { + uint32_t currentReadIndex; uint32_t currentWriteIndex; - + do { - currentWriteIndex = m_writeIndex.load(); -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - if (m_count.load() == Q_SIZE) { + currentWriteIndex = m_writeIndex; + currentReadIndex = m_readIndex; + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + if (m_count == Q_SIZE) { return false; } -#else - if (countToIndex(currentWriteIndex + 1) == countToIndex(m_readIndex.load())) + #else + if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) { // the queue is full return false; } -#endif - - // There is more than one producer. Keep looping till this thread is able - // to allocate space for current piece of data - // - // using compare_exchange_strong because it isn't allowed to fail spuriously - // When the compare_exchange operation is in a loop the weak version - // will yield better performance on some platforms, but here we'd have to - // load m_writeIndex all over again - } while (!m_writeIndex.compare_exchange_strong( - currentWriteIndex, (currentWriteIndex + 1))); - - // Just made sure this index is reserved for this thread. + #endif + + } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); + + // We know now that this index is reserved for us. Use it to save the data m_theQueue[countToIndex(currentWriteIndex)] = a_data; - //memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) ); - - // update the maximum read index after saving the piece of data. It can't - // fail if there is only one thread inserting in the queue. It might fail - // if there is more than 1 producer thread because this operation has to - // be done in the same order as the previous CAS - // - // using compare_exchange_weak because they are allowed to fail spuriously - // (act as if *this != expected, even if they are equal), but when the - // compare_exchange operation is in a loop the weak version will yield - // better performance on some platforms. - while (!m_maximumReadIndex.compare_exchange_weak( - currentWriteIndex, (currentWriteIndex + 1))) + + // update the maximum read index after saving the data. It wouldn't fail if there is only one thread + // inserting in the queue. It might fail if there are more than 1 producer threads because this + // operation has to be done in the same order as the previous CAS + + while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { // this is a good place to yield the thread in case there are more // software threads than hardware processors and you have more @@ -248,35 +240,35 @@ sched_yield(); } - // The value was successfully inserted into the queue #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - m_count.fetch_add(1); + AtomicAdd(&m_count, 1); #endif - return true; } + template <typename ELEM_T> bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data) { + uint32_t currentMaximumReadIndex; uint32_t currentReadIndex; do { - currentReadIndex = m_readIndex.load(); + // to ensure thread-safety when there is more than 1 producer thread + // a second index is defined (m_maximumReadIndex) + currentReadIndex = m_readIndex; + currentMaximumReadIndex = m_maximumReadIndex; + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count.load() == 0) { + if (m_count == 0) { return false; } #else - // to ensure thread-safety when there is more than 1 producer - // thread a second index is defined (m_maximumReadIndex) - if (countToIndex(currentReadIndex) == countToIndex(m_maximumReadIndex.load())) + if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) { // the queue is empty or - // a producer thread has allocate space in the queue but is + // a producer thread has allocate space in the queue but is // waiting to commit the data into it return false; } @@ -284,23 +276,22 @@ // retrieve the data from the queue a_data = m_theQueue[countToIndex(currentReadIndex)]; - //memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) ); + // try to perfrom now the CAS operation on the read index. If we succeed - // a_data already contains what m_readIndex pointed to before we + // a_data already contains what m_readIndex pointed to before we // increased it - if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1))) + if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { - // got here. The value was retrieved from the queue. Note that the - // data inside the m_queue array is not deleted nor reseted -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - m_count.fetch_sub(1); -#endif + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + // m_count.fetch_sub(1); + AtomicSub(&m_count, 1); + #endif return true; } - + // it failed retrieving the element off the queue. Someone else must // have read the element stored at countToIndex(currentReadIndex) - // before we could perform the CAS operation + // before we could perform the CAS operation } while(1); // keep looping to try again! @@ -308,18 +299,17 @@ assert(0); // Add this return statement to avoid compiler warnings - return false; + return false; } - template <typename ELEM_T> ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i) { - int currentCount = m_count.load(); - uint32_t currentReadIndex = m_readIndex.load(); + int currentCount = m_count; + uint32_t currentReadIndex = m_readIndex; if (i < 0 || i >= currentCount) { - std::cerr << "Error in array limits: " << i << " is out of range\n"; + std::cerr << "ArrayLockFreeQueue<ELEM_T>::operator[] , Error in array limits: " << i << " is out of range\n"; std::exit(EXIT_FAILURE); } return m_theQueue[countToIndex(currentReadIndex+i)]; -- Gitblit v1.8.0