From 47fa6069e616d4fa5b6b1317356987d219b3ad3f Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 20 七月 2020 11:20:26 +0800
Subject: [PATCH] update
---
queue/include/array_lock_free_queue.h | 189 +++++++++++++++++++++++------------------------
1 files changed, 93 insertions(+), 96 deletions(-)
diff --git a/queue/include/array_lock_free_queue.h b/queue/include/array_lock_free_queue.h
index 931e839..24a4ec6 100644
--- a/queue/include/array_lock_free_queue.h
+++ b/queue/include/array_lock_free_queue.h
@@ -1,9 +1,11 @@
#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
-
+#include "atomic_ops.h"
#include <assert.h> // assert()
#include <sched.h> // sched_yield()
#include "logger_factory.h"
+#include "mem_pool.h"
+#include "shm_allocator.h"
/// @brief implementation of an array based lock free queue with support for
/// multiple producers
@@ -11,13 +13,19 @@
/// methods are private). To instantiate a multiple producers lock free queue
/// you must use the ArrayLockFreeQueue fachade:
/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
-template <typename ELEM_T>
+
+
+#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+template <typename ELEM_T, typename Allocator = SHM_Allocator>
class ArrayLockFreeQueue
{
// ArrayLockFreeQueue will be using this' private members
template <
typename ELEM_T_,
- template <typename T> class Q_TYPE >
+ typename Allocator_,
+ template <typename T, typename AT> class Q_TYPE
+ >
friend class LockFreeQueue;
private:
@@ -48,10 +56,10 @@
ELEM_T *m_theQueue;
/// @brief where a new element will be inserted
- std::atomic<uint32_t> m_writeIndex;
+ uint32_t m_writeIndex;
/// @brief where the next element where be extracted from
- std::atomic<uint32_t> m_readIndex;
+ uint32_t m_readIndex;
/// @brief maximum read index for multiple producer queues
/// If it's not the same as m_writeIndex it means
@@ -61,23 +69,23 @@
/// to wait for those other threads to save the data into the queue
///
/// note this is only used for multiple producers
- std::atomic<uint32_t> m_maximumReadIndex;
+ uint32_t m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
/// @brief number of elements in the queue
- std::atomic<uint32_t> m_count;
+ uint32_t m_count;
#endif
private:
/// @brief disable copy constructor declaring it private
- ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src);
+ ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
};
-template <typename ELEM_T>
-ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize):
+template <typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
Q_SIZE(qsize),
m_writeIndex(0), // initialisation is not atomic
m_readIndex(0), //
@@ -86,38 +94,38 @@
,m_count(0) //
#endif
{
- m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T));
+ m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
}
-template <typename ELEM_T>
-ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue()
+template <typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
{
- std::cout << "destroy ArrayLockFreeQueue\n";
- mm_free(m_theQueue);
+ // std::cout << "destroy ArrayLockFreeQueue\n";
+ Allocator::deallocate(m_theQueue);
}
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator>
inline
-uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count)
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
{
// if Q_SIZE is a power of 2 this statement could be also written as
// return (a_count & (Q_SIZE - 1));
return (a_count % Q_SIZE);
}
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator>
inline
-uint32_t ArrayLockFreeQueue<ELEM_T>::size()
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- return m_count.load();
+ return m_count;
#else
- uint32_t currentWriteIndex = m_maximumReadIndex.load();
- uint32_t currentReadIndex = m_readIndex.load();
+ uint32_t currentWriteIndex = m_maximumReadIndex;
+ uint32_t currentReadIndex = m_readIndex;
// let's think of a scenario where this function returns bogus data
// 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
@@ -142,13 +150,13 @@
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator>
inline
-bool ArrayLockFreeQueue<ELEM_T>::full()
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- return (m_count.load() == (Q_SIZE));
+ return (m_count == (Q_SIZE));
#else
uint32_t currentWriteIndex = m_writeIndex;
@@ -167,16 +175,16 @@
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator>
inline
-bool ArrayLockFreeQueue<ELEM_T>::empty()
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- return (m_count.load() == 0);
+ return (m_count == 0);
#else
- if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load()))
+ if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
{
// the queue is full
return true;
@@ -190,53 +198,44 @@
}
-template <typename ELEM_T>
-bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
+
+
+
+
+template <typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
{
+ uint32_t currentReadIndex;
uint32_t currentWriteIndex;
-
+
do
{
- currentWriteIndex = m_writeIndex.load();
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- if (m_count.load() == Q_SIZE) {
+ currentWriteIndex = m_writeIndex;
+ currentReadIndex = m_readIndex;
+ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ if (m_count == Q_SIZE) {
return false;
}
-#else
- if (countToIndex(currentWriteIndex + 1) == countToIndex(m_readIndex.load()))
+ #else
+ if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
{
// the queue is full
return false;
}
-#endif
-
- // There is more than one producer. Keep looping till this thread is able
- // to allocate space for current piece of data
- //
- // using compare_exchange_strong because it isn't allowed to fail spuriously
- // When the compare_exchange operation is in a loop the weak version
- // will yield better performance on some platforms, but here we'd have to
- // load m_writeIndex all over again
- } while (!m_writeIndex.compare_exchange_strong(
- currentWriteIndex, (currentWriteIndex + 1)));
-
- // Just made sure this index is reserved for this thread.
- // m_theQueue[countToIndex(currentWriteIndex)] = a_data;
- // printf("===sizeof(ELEM_T) = %d\n", sizeof(ELEM_T));
- memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) );
-
- // update the maximum read index after saving the piece of data. It can't
- // fail if there is only one thread inserting in the queue. It might fail
- // if there is more than 1 producer thread because this operation has to
- // be done in the same order as the previous CAS
- //
- // using compare_exchange_weak because they are allowed to fail spuriously
- // (act as if *this != expected, even if they are equal), but when the
- // compare_exchange operation is in a loop the weak version will yield
- // better performance on some platforms.
- while (!m_maximumReadIndex.compare_exchange_weak(
- currentWriteIndex, (currentWriteIndex + 1)))
+ #endif
+
+ } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+ // We know now that this index is reserved for us. Use it to save the data
+ m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+
+ // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
+ // inserting in the queue. It might fail if there are more than 1 producer threads because this
+ // operation has to be done in the same order as the previous CAS
+
+ while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
{
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
@@ -245,59 +244,58 @@
sched_yield();
}
- // The value was successfully inserted into the queue
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- m_count.fetch_add(1);
+ AtomicAdd(&m_count, 1);
#endif
-
return true;
}
-template <typename ELEM_T>
-bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
+
+template <typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
{
+ uint32_t currentMaximumReadIndex;
uint32_t currentReadIndex;
do
{
- currentReadIndex = m_readIndex.load();
+ // to ensure thread-safety when there is more than 1 producer thread
+ // a second index is defined (m_maximumReadIndex)
+ currentReadIndex = m_readIndex;
+ currentMaximumReadIndex = m_maximumReadIndex;
+ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
- if (m_count.load() == 0) {
+ if (m_count == 0) {
return false;
}
#else
- // to ensure thread-safety when there is more than 1 producer
- // thread a second index is defined (m_maximumReadIndex)
- if (countToIndex(currentReadIndex) == countToIndex(m_maximumReadIndex.load()))
+ if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
{
// the queue is empty or
- // a producer thread has allocate space in the queue but is
+ // a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}
#endif
// retrieve the data from the queue
- //a_data = m_theQueue[countToIndex(currentReadIndex)];
- memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) );
+ a_data = m_theQueue[countToIndex(currentReadIndex)];
+
// try to perfrom now the CAS operation on the read index. If we succeed
- // a_data already contains what m_readIndex pointed to before we
+ // a_data already contains what m_readIndex pointed to before we
// increased it
- if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1)))
+ if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
{
- // got here. The value was retrieved from the queue. Note that the
- // data inside the m_queue array is not deleted nor reseted
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- m_count.fetch_sub(1);
-#endif
+ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ // m_count.fetch_sub(1);
+ AtomicSub(&m_count, 1);
+ #endif
return true;
}
-
+
// it failed retrieving the element off the queue. Someone else must
// have read the element stored at countToIndex(currentReadIndex)
- // before we could perform the CAS operation
+ // before we could perform the CAS operation
} while(1); // keep looping to try again!
@@ -305,18 +303,17 @@
assert(0);
// Add this return statement to avoid compiler warnings
- return false;
+ return false;
}
-
-template <typename ELEM_T>
-ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i)
+template <typename ELEM_T, typename Allocator>
+ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
- int currentCount = m_count.load();
- uint32_t currentReadIndex = m_readIndex.load();
+ int currentCount = m_count;
+ uint32_t currentReadIndex = m_readIndex;
if (i < 0 || i >= currentCount)
{
- std::cerr << "Error in array limits: " << i << " is out of range\n";
+ std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
std::exit(EXIT_FAILURE);
}
return m_theQueue[countToIndex(currentReadIndex+i)];
--
Gitblit v1.8.0