From 21f03d8d05e5c04953e31d7e4360346031603511 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 04 二月 2021 11:47:50 +0800
Subject: [PATCH] 修改注解
---
src/queue/array_lock_free_queue.h | 491 ++++++++++++++++++++++++++----------------------------
1 files changed, 238 insertions(+), 253 deletions(-)
diff --git a/src/queue/array_lock_free_queue.h b/src/queue/array_lock_free_queue.h
index 233bc6a..a03b33e 100644
--- a/src/queue/array_lock_free_queue.h
+++ b/src/queue/array_lock_free_queue.h
@@ -1,5 +1,6 @@
-#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
-#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
+#ifndef __ARRAY_LOCK_FREE_QUEUE_H__
+#define __ARRAY_LOCK_FREE_QUEUE_H__
+
#include "atomic_ops.h"
#include <assert.h> // assert()
#include <sched.h> // sched_yield()
@@ -17,306 +18,290 @@
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-template <typename ELEM_T, typename Allocator = SHM_Allocator>
-class ArrayLockFreeQueue
-{
- // ArrayLockFreeQueue will be using this' private members
- template <
- typename ELEM_T_,
- typename Allocator_,
- template <typename T, typename AT> class Q_TYPE
- >
- friend class LockFreeQueue;
+template<typename ELEM_T, typename Allocator = SHM_Allocator>
+class ArrayLockFreeQueue {
+ // ArrayLockFreeQueue will be using this' private members
+ template<
+ typename ELEM_T_,
+ typename Allocator_,
+ template<typename T, typename AT> class Q_TYPE
+ >
+ friend
+ class LockFreeQueue;
private:
- /// @brief constructor of the class
- ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
-
- virtual ~ArrayLockFreeQueue();
-
- inline uint32_t size();
-
- inline bool full();
+ /// @brief constructor of the class
+ ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
- inline bool empty();
-
- bool push(const ELEM_T &a_data);
-
- bool pop(ELEM_T &a_data);
-
- /// @brief calculate the index in the circular array that corresponds
- /// to a particular "count" value
- inline uint32_t countToIndex(uint32_t a_count);
+ virtual ~ArrayLockFreeQueue();
- ELEM_T& operator[](unsigned i);
-
-private:
- size_t Q_SIZE;
- /// @brief array to keep the elements
- ELEM_T *m_theQueue;
+ inline uint32_t size();
- /// @brief where a new element will be inserted
- uint32_t m_writeIndex;
+ inline bool full();
- /// @brief where the next element where be extracted from
- uint32_t m_readIndex;
-
- /// @brief maximum read index for multiple producer queues
- /// If it's not the same as m_writeIndex it means
- /// there are writes pending to be "committed" to the queue, that means,
- /// the place for the data was reserved (the index in the array) but
- /// data is still not in the queue, so the thread trying to read will have
- /// to wait for those other threads to save the data into the queue
- ///
- /// note this is only used for multiple producers
- uint32_t m_maximumReadIndex;
+ inline bool empty();
+
+ bool push(const ELEM_T &a_data);
+
+ bool pop(ELEM_T &a_data);
+
+ /// @brief calculate the index in the circular array that corresponds
+ /// to a particular "count" value
+ inline uint32_t countToIndex(uint32_t a_count);
+
+ ELEM_T &operator[](unsigned i);
+
+private:
+ size_t Q_SIZE;
+ /// @brief array to keep the elements
+ ELEM_T *m_theQueue;
+
+ /// @brief where a new element will be inserted
+ uint32_t m_writeIndex;
+
+ /// @brief where the next element where be extracted from
+ uint32_t m_readIndex;
+
+ /// @brief maximum read index for multiple producer queues
+ /// If it's not the same as m_writeIndex it means
+ /// there are writes pending to be "committed" to the queue, that means,
+ /// the place for the data was reserved (the index in the array) but
+ /// data is still not in the queue, so the thread trying to read will have
+ /// to wait for those other threads to save the data into the queue
+ ///
+ /// note this is only used for multiple producers
+ uint32_t m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- /// @brief number of elements in the queue
- uint32_t m_count;
+ /// @brief number of elements in the queue
+ uint32_t m_count;
#endif
-
-
+
+
private:
- /// @brief disable copy constructor declaring it private
- ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
+ /// @brief disable copy constructor declaring it private
+ ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
};
-template <typename ELEM_T, typename Allocator>
+template<typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
- Q_SIZE(qsize),
- m_writeIndex(0), // initialisation is not atomic
- m_readIndex(0), //
- m_maximumReadIndex(0) //
+ Q_SIZE(qsize),
+ m_writeIndex(0), // initialisation is not atomic
+ m_readIndex(0), //
+ m_maximumReadIndex(0) //
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- ,m_count(0) //
+ , m_count(0) //
#endif
{
- m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
+ m_theQueue = (ELEM_T *) Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
}
-template <typename ELEM_T, typename Allocator>
-ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
-{
- // std::cout << "destroy ArrayLockFreeQueue\n";
- Allocator::deallocate(m_theQueue);
-
+template<typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() {
+ // std::cout << "destroy ArrayLockFreeQueue\n";
+ Allocator::deallocate(m_theQueue);
+
}
-template <typename ELEM_T, typename Allocator>
-inline
-uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
-{
- // if Q_SIZE is a power of 2 this statement could be also written as
- // return (a_count & (Q_SIZE - 1));
- return (a_count % Q_SIZE);
+template<typename ELEM_T, typename Allocator>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) {
+ // if Q_SIZE is a power of 2 this statement could be also written as
+ // return (a_count & (Q_SIZE - 1));
+ return (a_count % Q_SIZE);
}
-template <typename ELEM_T, typename Allocator>
-inline
-uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
-{
+template<typename ELEM_T, typename Allocator>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- return m_count;
+ return m_count;
#else
- uint32_t currentWriteIndex = m_maximumReadIndex;
- uint32_t currentReadIndex = m_readIndex;
+ uint32_t currentWriteIndex = m_maximumReadIndex;
+ uint32_t currentReadIndex = m_readIndex;
- // let's think of a scenario where this function returns bogus data
- // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
- // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
- // 2. afterwards this thread is preemted. While this thread is inactive 2
- // elements are inserted and removed from the queue, so m_maximumReadIndex
- // is 5 and m_readIndex 4. Real size is still 1
- // 3. Now the current thread comes back from preemption and reads m_readIndex.
- // currentReadIndex is 4
- // 4. currentReadIndex is bigger than currentWriteIndex, so
- // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
- // it returns that the queue is almost full, when it is almost empty
- //
- if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
- {
- return (currentWriteIndex - currentReadIndex);
- }
- else
- {
- return (Q_SIZE + currentWriteIndex - currentReadIndex);
- }
+ // let's think of a scenario where this function returns bogus data
+ // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
+ // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
+ // 2. afterwards this thread is preemted. While this thread is inactive 2
+ // elements are inserted and removed from the queue, so m_maximumReadIndex
+ // is 5 and m_readIndex 4. Real size is still 1
+ // 3. Now the current thread comes back from preemption and reads m_readIndex.
+ // currentReadIndex is 4
+ // 4. currentReadIndex is bigger than currentWriteIndex, so
+ // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+ // it returns that the queue is almost full, when it is almost empty
+ //
+ if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
+ {
+ return (currentWriteIndex - currentReadIndex);
+ }
+ else
+ {
+ return (Q_SIZE + currentWriteIndex - currentReadIndex);
+ }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
-template <typename ELEM_T, typename Allocator>
-inline
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
-{
+template<typename ELEM_T, typename Allocator>
+inline
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- return (m_count == (Q_SIZE));
+ return (m_count == (Q_SIZE));
#else
- uint32_t currentWriteIndex = m_writeIndex;
- uint32_t currentReadIndex = m_readIndex;
-
+ uint32_t currentWriteIndex = m_writeIndex;
+ uint32_t currentReadIndex = m_readIndex;
+
+ if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+ {
+ // the queue is full
+ return true;
+ }
+ else
+ {
+ // not full!
+ return false;
+ }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template<typename ELEM_T, typename Allocator>
+inline
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() {
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ return (m_count == 0);
+#else
+
+ if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
+ {
+ // the queue is full
+ return true;
+ }
+ else
+ {
+ // not full!
+ return false;
+ }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+
+template<typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) {
+ uint32_t currentReadIndex;
+ uint32_t currentWriteIndex;
+
+ do {
+
+ currentWriteIndex = m_writeIndex;
+ currentReadIndex = m_readIndex;
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ if (m_count == Q_SIZE) {
+ return false;
+ }
+#else
if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
{
// the queue is full
- return true;
- }
- else
- {
- // not full!
return false;
}
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-template <typename ELEM_T, typename Allocator>
-inline
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
-{
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
- return (m_count == 0);
-#else
-
- if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
- {
- // the queue is full
- return true;
- }
- else
- {
- // not full!
- return false;
- }
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-
-
-
-
-
-template <typename ELEM_T, typename Allocator>
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
-{
- uint32_t currentReadIndex;
- uint32_t currentWriteIndex;
-
- do
- {
-
- currentWriteIndex = m_writeIndex;
- currentReadIndex = m_readIndex;
- #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
- if (m_count == Q_SIZE) {
- return false;
- }
- #else
- if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
- {
- // the queue is full
- return false;
- }
- #endif
-
- } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
-
- // We know now that this index is reserved for us. Use it to save the data
- m_theQueue[countToIndex(currentWriteIndex)] = a_data;
-
- // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
- // inserting in the queue. It might fail if there are more than 1 producer threads because this
- // operation has to be done in the same order as the previous CAS
-
- while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
- {
- // this is a good place to yield the thread in case there are more
- // software threads than hardware processors and you have more
- // than 1 producer thread
- // have a look at sched_yield (POSIX.1b)
- sched_yield();
- }
-
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- AtomicAdd(&m_count, 1);
#endif
- return true;
+
+ } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+ // We know now that this index is reserved for us. Use it to save the data
+ m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+
+ // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
+ // inserting in the queue. It might fail if there are more than 1 producer threads because this
+ // operation has to be done in the same order as the previous CAS
+
+ while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
+ // this is a good place to yield the thread in case there are more
+ // software threads than hardware processors and you have more
+ // than 1 producer thread
+ // have a look at sched_yield (POSIX.1b)
+ sched_yield();
+ }
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ AtomicAdd(&m_count, 1);
+#endif
+ return true;
}
-template <typename ELEM_T, typename Allocator>
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
-{
- uint32_t currentMaximumReadIndex;
- uint32_t currentReadIndex;
+template<typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) {
+ uint32_t currentMaximumReadIndex;
+ uint32_t currentReadIndex;
- do
- {
- // to ensure thread-safety when there is more than 1 producer thread
- // a second index is defined (m_maximumReadIndex)
- currentReadIndex = m_readIndex;
- currentMaximumReadIndex = m_maximumReadIndex;
- #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ do {
+ // to ensure thread-safety when there is more than 1 producer thread
+ // a second index is defined (m_maximumReadIndex)
+ currentReadIndex = m_readIndex;
+ currentMaximumReadIndex = m_maximumReadIndex;
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- if (m_count == 0) {
- return false;
- }
- #else
- if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
- {
- // the queue is empty or
- // a producer thread has allocate space in the queue but is
- // waiting to commit the data into it
- return false;
- }
- #endif
-
- // retrieve the data from the queue
- a_data = m_theQueue[countToIndex(currentReadIndex)];
-
- // try to perfrom now the CAS operation on the read index. If we succeed
- // a_data already contains what m_readIndex pointed to before we
- // increased it
- if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
- {
- #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- // m_count.fetch_sub(1);
- AtomicSub(&m_count, 1);
- #endif
- return true;
- }
-
- // it failed retrieving the element off the queue. Someone else must
- // have read the element stored at countToIndex(currentReadIndex)
- // before we could perform the CAS operation
-
- } while(1); // keep looping to try again!
-
- // Something went wrong. it shouldn't be possible to reach here
- assert(0);
-
- // Add this return statement to avoid compiler warnings
- return false;
-}
-
-template <typename ELEM_T, typename Allocator>
-ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
-{
- int currentCount = m_count;
- uint32_t currentReadIndex = m_readIndex;
- if (i >= currentCount)
- {
- std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
- std::exit(EXIT_FAILURE);
+ if (m_count == 0) {
+ return false;
}
- return m_theQueue[countToIndex(currentReadIndex+i)];
+#else
+ if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
+ {
+ // the queue is empty or
+ // a producer thread has allocate space in the queue but is
+ // waiting to commit the data into it
+ return false;
+ }
+#endif
+
+ // retrieve the data from the queue
+ a_data = m_theQueue[countToIndex(currentReadIndex)];
+
+ // try to perfrom now the CAS operation on the read index. If we succeed
+ // a_data already contains what m_readIndex pointed to before we
+ // increased it
+ if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ // m_count.fetch_sub(1);
+ AtomicSub(&m_count, 1);
+#endif
+ return true;
+ }
+
+ // it failed retrieving the element off the queue. Someone else must
+ // have read the element stored at countToIndex(currentReadIndex)
+ // before we could perform the CAS operation
+
+ } while (1); // keep looping to try again!
+
+ // Something went wrong. it shouldn't be possible to reach here
+ assert(0);
+
+ // Add this return statement to avoid compiler warnings
+ return false;
+}
+
+template<typename ELEM_T, typename Allocator>
+ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) {
+ int currentCount = m_count;
+ uint32_t currentReadIndex = m_readIndex;
+ if (i >= currentCount) {
+ std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i
+ << " is out of range\n";
+ std::exit(EXIT_FAILURE);
+ }
+ return m_theQueue[countToIndex(currentReadIndex + i)];
}
#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
--
Gitblit v1.8.0