From 4fd62552d8277f3d0ed20e66663cd219c36796df Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 21 一月 2021 11:20:22 +0800
Subject: [PATCH] update

---
 src/queue/array_lock_free_queue2.h |  176 +++++++++++++++++++++++++++-------------------------------
 1 files changed, 83 insertions(+), 93 deletions(-)

diff --git a/src/queue/array_lock_free_queue2.h b/src/queue/array_lock_free_queue2.h
index 3b79b7f..233bc6a 100644
--- a/src/queue/array_lock_free_queue2.h
+++ b/src/queue/array_lock_free_queue2.h
@@ -1,9 +1,11 @@
 #ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
 #define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
-
+#include "atomic_ops.h"
 #include <assert.h> // assert()
 #include <sched.h>  // sched_yield()
 #include "logger_factory.h"
+#include "mem_pool.h"
+#include "shm_allocator.h"
 
 /// @brief implementation of an array based lock free queue with support for 
 ///        multiple producers
@@ -15,13 +17,15 @@
 
 #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator = SHM_Allocator>
 class ArrayLockFreeQueue
 {
     // ArrayLockFreeQueue will be using this' private members
     template <
         typename ELEM_T_, 
-        template <typename T> class Q_TYPE >
+        typename Allocator_,
+        template <typename T, typename AT> class Q_TYPE
+        >
     friend class LockFreeQueue;
 
 private:
@@ -52,10 +56,10 @@
     ELEM_T *m_theQueue;
 
     /// @brief where a new element will be inserted
-    std::atomic<uint32_t> m_writeIndex;
+    uint32_t m_writeIndex;
 
     /// @brief where the next element where be extracted from
-    std::atomic<uint32_t> m_readIndex;
+    uint32_t m_readIndex;
     
     /// @brief maximum read index for multiple producer queues
     /// If it's not the same as m_writeIndex it means
@@ -65,23 +69,23 @@
     /// to wait for those other threads to save the data into the queue
     ///
     /// note this is only used for multiple producers
-    std::atomic<uint32_t> m_maximumReadIndex;
+    uint32_t m_maximumReadIndex;
 
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
     /// @brief number of elements in the queue
-    std::atomic<uint32_t> m_count;
+    uint32_t m_count;
 #endif
    
     
 private:
     /// @brief disable copy constructor declaring it private
-    ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src);
+    ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
 
 };
 
 
-template <typename ELEM_T>
-ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize):
+template <typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
     Q_SIZE(qsize),
     m_writeIndex(0),      // initialisation is not atomic
     m_readIndex(0),       //
@@ -90,38 +94,38 @@
     ,m_count(0)           //
 #endif
 {
-    m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T));
+    m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
 
 }
 
-template <typename ELEM_T>
-ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue()
+template <typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
 {
     // std::cout << "destroy ArrayLockFreeQueue\n";
-    mm_free(m_theQueue);
+    Allocator::deallocate(m_theQueue);
     
 }
 
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator>
 inline 
-uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count)
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
 {
     // if Q_SIZE is a power of 2 this statement could be also written as 
     // return (a_count & (Q_SIZE - 1));
     return (a_count % Q_SIZE);
 }
 
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator>
 inline 
-uint32_t ArrayLockFreeQueue<ELEM_T>::size()
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
 {
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-    return m_count.load();
+    return m_count;
 #else
 
-    uint32_t currentWriteIndex = m_maximumReadIndex.load();
-    uint32_t currentReadIndex  = m_readIndex.load();
+    uint32_t currentWriteIndex = m_maximumReadIndex;
+    uint32_t currentReadIndex  = m_readIndex;
 
     // let's think of a scenario where this function returns bogus data
     // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
@@ -146,13 +150,13 @@
 #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 }
 
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator>
 inline 
-bool ArrayLockFreeQueue<ELEM_T>::full()
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
 {
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-    return (m_count.load() == (Q_SIZE));
+    return (m_count == (Q_SIZE));
 #else
 
     uint32_t currentWriteIndex = m_writeIndex;
@@ -171,16 +175,16 @@
 #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 }
 
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator>
 inline 
-bool ArrayLockFreeQueue<ELEM_T>::empty()
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
 {
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-    return (m_count.load() == 0);
+    return (m_count == 0);
 #else
 
-    if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load()))
+    if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
     {
         // the queue is full
         return true;
@@ -194,54 +198,44 @@
 }
 
 
-template <typename ELEM_T>
-bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
+
+
+
+
+template <typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
 {
     uint32_t currentReadIndex;
     uint32_t currentWriteIndex;
-    
+
     do
     {
-        currentWriteIndex = m_writeIndex.load();
-        currentReadIndex = m_readIndex.load();
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-        if (m_count.load() == Q_SIZE) {
+        currentWriteIndex = m_writeIndex;
+        currentReadIndex  = m_readIndex;
+    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+        if (m_count == Q_SIZE) {
             return false;
         }
-#else
+    #else
         if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
         {
             // the queue is full
             return false;
         }
-#endif
-        
-    // There is more than one producer. Keep looping till this thread is able 
-    // to allocate space for current piece of data
-    //
-    // using compare_exchange_strong because it isn't allowed to fail spuriously
-    // When the compare_exchange operation is in a loop the weak version
-    // will yield better performance on some platforms, but here we'd have to
-    // load m_writeIndex all over again
-    } while (!m_writeIndex.compare_exchange_strong(
-                currentWriteIndex, (currentWriteIndex + 1)));
-    
-    // Just made sure this index is reserved for this thread.
+    #endif
+
+    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+    // We know now that this index is reserved for us. Use it to save the data
     m_theQueue[countToIndex(currentWriteIndex)] = a_data;
-    //memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) );
-    
-    // update the maximum read index after saving the piece of data. It can't
-    // fail if there is only one thread inserting in the queue. It might fail 
-    // if there is more than 1 producer thread because this operation has to
-    // be done in the same order as the previous CAS
-    //
-    // using compare_exchange_weak because they are allowed to fail spuriously
-    // (act as if *this != expected, even if they are equal), but when the
-    // compare_exchange operation is in a loop the weak version will yield
-    // better performance on some platforms.
-    while (!m_maximumReadIndex.compare_exchange_weak(
-                currentWriteIndex, (currentWriteIndex + 1)))
+
+    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
+    // inserting in the queue. It might fail if there are more than 1 producer threads because this
+    // operation has to be done in the same order as the previous CAS
+
+    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
     {
         // this is a good place to yield the thread in case there are more
         // software threads than hardware processors and you have more
@@ -250,37 +244,35 @@
         sched_yield();
     }
 
-    // The value was successfully inserted into the queue
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-    m_count.fetch_add(1);
+    AtomicAdd(&m_count, 1);
 #endif
-
     return true;
 }
 
-template <typename ELEM_T>
-bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
+
+template <typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
 {
     uint32_t currentMaximumReadIndex;
     uint32_t currentReadIndex;
 
     do
     {
-        currentReadIndex = m_readIndex.load();
-        currentMaximumReadIndex = m_maximumReadIndex.load();
+        // to ensure thread-safety when there is more than 1 producer thread
+        // a second index is defined (m_maximumReadIndex)
+        currentReadIndex        = m_readIndex;
+        currentMaximumReadIndex = m_maximumReadIndex;
+    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-     #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
-        if (m_count.load() == 0) {
+        if (m_count == 0) {
             return false;
         }
     #else
-        // to ensure thread-safety when there is more than 1 producer 
-        // thread a second index is defined (m_maximumReadIndex)
         if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
         {
             // the queue is empty or
-            // a producer thread has allocate space in the queue but is 
+            // a producer thread has allocate space in the queue but is
             // waiting to commit the data into it
             return false;
         }
@@ -288,23 +280,22 @@
 
         // retrieve the data from the queue
         a_data = m_theQueue[countToIndex(currentReadIndex)];
-        //memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) );
+
         // try to perfrom now the CAS operation on the read index. If we succeed
-        // a_data already contains what m_readIndex pointed to before we 
+        // a_data already contains what m_readIndex pointed to before we
         // increased it
-        if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1)))
+        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
         {
-            // got here. The value was retrieved from the queue. Note that the
-            // data inside the m_queue array is not deleted nor reseted
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-            m_count.fetch_sub(1);
-#endif
+        #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+            // m_count.fetch_sub(1);
+            AtomicSub(&m_count, 1);
+        #endif
             return true;
         }
-        
+
         // it failed retrieving the element off the queue. Someone else must
         // have read the element stored at countToIndex(currentReadIndex)
-        // before we could perform the CAS operation        
+        // before we could perform the CAS operation
 
     } while(1); // keep looping to try again!
 
@@ -312,18 +303,17 @@
     assert(0);
 
     // Add this return statement to avoid compiler warnings
-    return false;    
+    return false;
 }
 
-
-template <typename ELEM_T>
-ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i)
+template <typename ELEM_T, typename Allocator>
+ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
 {
-    int currentCount = m_count.load();
-    uint32_t currentReadIndex = m_readIndex.load();
-    if (i < 0 || i >= currentCount)
+    int currentCount = m_count;
+    uint32_t currentReadIndex = m_readIndex;
+    if (i >= currentCount)
     {
-        std::cerr << "Error in array limits: " << i << " is out of range\n";
+        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
         std::exit(EXIT_FAILURE);
     }
     return m_theQueue[countToIndex(currentReadIndex+i)];

--
Gitblit v1.8.0