From 68d23225a38a35f1325eb39fa4ed5a005d5de473 Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@aiot.com>
Date: 星期三, 11 八月 2021 09:50:20 +0800
Subject: [PATCH] fix from 3.1 first commit

---
 src/queue/array_lock_free_queue.h |  489 ++++++++++++++++++++++++++----------------------------
 1 files changed, 237 insertions(+), 252 deletions(-)

diff --git a/src/queue/array_lock_free_queue.h b/src/queue/array_lock_free_queue.h
index ae1506d..5ff8daf 100644
--- a/src/queue/array_lock_free_queue.h
+++ b/src/queue/array_lock_free_queue.h
@@ -1,10 +1,11 @@
 #ifndef __ARRAY_LOCK_FREE_QUEUE_H__
 #define __ARRAY_LOCK_FREE_QUEUE_H__
+
 #include "atomic_ops.h"
 #include <assert.h> // assert()
 #include <sched.h>  // sched_yield()
 #include "logger_factory.h"
-#include "mem_pool.h"
+#include "shm_mm.h"
 #include "shm_allocator.h"
 
 /// @brief implementation of an array based lock free queue with support for 
@@ -17,306 +18,290 @@
 
 #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-template <typename ELEM_T, typename Allocator = SHM_Allocator>
-class ArrayLockFreeQueue
-{
-    // ArrayLockFreeQueue will be using this' private members
-    template <
-        typename ELEM_T_, 
-        typename Allocator_,
-        template <typename T, typename AT> class Q_TYPE
-        >
-    friend class LockFreeQueue;
+template<typename ELEM_T, typename Allocator = SHM_Allocator>
+class ArrayLockFreeQueue {
+  // ArrayLockFreeQueue will be using this' private members
+  template<
+    typename ELEM_T_,
+    typename Allocator_,
+    template<typename T, typename AT> class Q_TYPE
+  >
+  friend
+  class LockFreeQueue;
 
 private:
-    /// @brief constructor of the class
-    ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
-    
-    virtual ~ArrayLockFreeQueue();
-    
-    inline uint32_t size();
-    
-    inline bool full();
+  /// @brief constructor of the class
+  ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
 
-    inline bool empty();
-    
-    bool push(const ELEM_T &a_data);   
-    
-    bool pop(ELEM_T &a_data);
-    
-    /// @brief calculate the index in the circular array that corresponds
-    /// to a particular "count" value
-    inline uint32_t countToIndex(uint32_t a_count);
+  virtual ~ArrayLockFreeQueue();
 
-    ELEM_T& operator[](unsigned i);
-    
-private:    
-    size_t Q_SIZE;
-    /// @brief array to keep the elements
-    ELEM_T *m_theQueue;
+  inline uint32_t size();
 
-    /// @brief where a new element will be inserted
-    uint32_t m_writeIndex;
+  inline bool full();
 
-    /// @brief where the next element where be extracted from
-    uint32_t m_readIndex;
-    
-    /// @brief maximum read index for multiple producer queues
-    /// If it's not the same as m_writeIndex it means
-    /// there are writes pending to be "committed" to the queue, that means,
-    /// the place for the data was reserved (the index in the array) but  
-    /// data is still not in the queue, so the thread trying to read will have 
-    /// to wait for those other threads to save the data into the queue
-    ///
-    /// note this is only used for multiple producers
-    uint32_t m_maximumReadIndex;
+  inline bool empty();
+
+  bool push(const ELEM_T &a_data);
+
+  bool pop(ELEM_T &a_data);
+
+  /// @brief calculate the index in the circular array that corresponds
+  /// to a particular "count" value
+  inline uint32_t countToIndex(uint32_t a_count);
+
+  ELEM_T &operator[](unsigned i);
+
+private:
+  size_t Q_SIZE;
+  /// @brief array to keep the elements
+  ELEM_T *m_theQueue;
+
+  /// @brief where a new element will be inserted
+  uint32_t m_writeIndex;
+
+  /// @brief where the next element where be extracted from
+  uint32_t m_readIndex;
+
+  /// @brief maximum read index for multiple producer queues
+  /// If it's not the same as m_writeIndex it means
+  /// there are writes pending to be "committed" to the queue, that means,
+  /// the place for the data was reserved (the index in the array) but
+  /// data is still not in the queue, so the thread trying to read will have
+  /// to wait for those other threads to save the data into the queue
+  ///
+  /// note this is only used for multiple producers
+  uint32_t m_maximumReadIndex;
 
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-    /// @brief number of elements in the queue
-    uint32_t m_count;
+  /// @brief number of elements in the queue
+  uint32_t m_count;
 #endif
-   
-    
+
+
 private:
-    /// @brief disable copy constructor declaring it private
-    ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
+  /// @brief disable copy constructor declaring it private
+  ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
 
 };
 
 
-template <typename ELEM_T, typename Allocator>
+template<typename ELEM_T, typename Allocator>
 ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
-    Q_SIZE(qsize),
-    m_writeIndex(0),      // initialisation is not atomic
-    m_readIndex(0),       //
-    m_maximumReadIndex(0) //
+  Q_SIZE(qsize),
+  m_writeIndex(0),      // initialisation is not atomic
+  m_readIndex(0),       //
+  m_maximumReadIndex(0) //
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-    ,m_count(0)           //
+  , m_count(0)           //
 #endif
 {
-    m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
+  m_theQueue = (ELEM_T *) Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
 
 }
 
-template <typename ELEM_T, typename Allocator>
-ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
-{
-    // std::cout << "destroy ArrayLockFreeQueue\n";
-    Allocator::deallocate(m_theQueue);
-    
+template<typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() {
+  // std::cout << "destroy ArrayLockFreeQueue\n";
+  Allocator::deallocate(m_theQueue);
+
 }
 
-template <typename ELEM_T, typename Allocator>
-inline 
-uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
-{
-    // if Q_SIZE is a power of 2 this statement could be also written as 
-    // return (a_count & (Q_SIZE - 1));
-    return (a_count % Q_SIZE);
+template<typename ELEM_T, typename Allocator>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) {
+  // if Q_SIZE is a power of 2 this statement could be also written as
+  // return (a_count & (Q_SIZE - 1));
+  return (a_count % Q_SIZE);
 }
 
-template <typename ELEM_T, typename Allocator>
-inline 
-uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
-{
+template<typename ELEM_T, typename Allocator>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() {
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-    return m_count;
+  return m_count;
 #else
 
-    uint32_t currentWriteIndex = m_maximumReadIndex;
-    uint32_t currentReadIndex  = m_readIndex;
+  uint32_t currentWriteIndex = m_maximumReadIndex;
+  uint32_t currentReadIndex  = m_readIndex;
 
-    // let's think of a scenario where this function returns bogus data
-    // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
-    // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
-    // 2. afterwards this thread is preemted. While this thread is inactive 2 
-    // elements are inserted and removed from the queue, so m_maximumReadIndex 
-    // is 5 and m_readIndex 4. Real size is still 1
-    // 3. Now the current thread comes back from preemption and reads m_readIndex.
-    // currentReadIndex is 4
-    // 4. currentReadIndex is bigger than currentWriteIndex, so
-    // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
-    // it returns that the queue is almost full, when it is almost empty
-    //
-    if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
-    {
-        return (currentWriteIndex - currentReadIndex);
-    }
-    else
-    {
-        return (Q_SIZE + currentWriteIndex - currentReadIndex);
-    }
+  // let's think of a scenario where this function returns bogus data
+  // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
+  // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
+  // 2. afterwards this thread is preemted. While this thread is inactive 2
+  // elements are inserted and removed from the queue, so m_maximumReadIndex
+  // is 5 and m_readIndex 4. Real size is still 1
+  // 3. Now the current thread comes back from preemption and reads m_readIndex.
+  // currentReadIndex is 4
+  // 4. currentReadIndex is bigger than currentWriteIndex, so
+  // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+  // it returns that the queue is almost full, when it is almost empty
+  //
+  if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
+  {
+      return (currentWriteIndex - currentReadIndex);
+  }
+  else
+  {
+      return (Q_SIZE + currentWriteIndex - currentReadIndex);
+  }
 #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 }
 
-template <typename ELEM_T, typename Allocator>
-inline 
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
-{
+template<typename ELEM_T, typename Allocator>
+inline
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() {
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-    return (m_count == (Q_SIZE));
+  return (m_count == (Q_SIZE));
 #else
 
-    uint32_t currentWriteIndex = m_writeIndex;
-    uint32_t currentReadIndex  = m_readIndex;
-    
+  uint32_t currentWriteIndex = m_writeIndex;
+  uint32_t currentReadIndex  = m_readIndex;
+
+  if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+  {
+      // the queue is full
+      return true;
+  }
+  else
+  {
+      // not full!
+      return false;
+  }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template<typename ELEM_T, typename Allocator>
+inline
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() {
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+  return (m_count == 0);
+#else
+
+  if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
+  {
+      // the queue is full
+      return true;
+  }
+  else
+  {
+      // not full!
+      return false;
+  }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+
+template<typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) {
+  uint32_t currentReadIndex;
+  uint32_t currentWriteIndex;
+
+  do {
+
+    currentWriteIndex = m_writeIndex;
+    currentReadIndex = m_readIndex;
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+    if (m_count == Q_SIZE) {
+      return false;
+    }
+#else
     if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
     {
         // the queue is full
-        return true;
-    }
-    else
-    {
-        // not full!
         return false;
     }
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-template <typename ELEM_T, typename Allocator>
-inline 
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
-{
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
-    return (m_count == 0);
-#else
-
-    if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
-    {
-        // the queue is full
-        return true;
-    }
-    else
-    {
-        // not full!
-        return false;
-    }
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-
-
-
-
-
-template <typename ELEM_T, typename Allocator>
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
-{
-    uint32_t currentReadIndex;
-    uint32_t currentWriteIndex;
-
-    do
-    {
-
-        currentWriteIndex = m_writeIndex;
-        currentReadIndex  = m_readIndex;
-    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
-        if (m_count == Q_SIZE) {
-            return false;
-        }
-    #else
-        if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
-        {
-            // the queue is full
-            return false;
-        }
-    #endif
-
-    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
-
-    // We know now that this index is reserved for us. Use it to save the data
-    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
-
-    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
-    // inserting in the queue. It might fail if there are more than 1 producer threads because this
-    // operation has to be done in the same order as the previous CAS
-
-    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
-    {
-        // this is a good place to yield the thread in case there are more
-        // software threads than hardware processors and you have more
-        // than 1 producer thread
-        // have a look at sched_yield (POSIX.1b)
-        sched_yield();
-    }
-
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-    AtomicAdd(&m_count, 1);
 #endif
-    return true;
+
+  } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+  // We know now that this index is reserved for us. Use it to save the data
+  m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+
+  // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
+  // inserting in the queue. It might fail if there are more than 1 producer threads because this
+  // operation has to be done in the same order as the previous CAS
+
+  while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
+    // this is a good place to yield the thread in case there are more
+    // software threads than hardware processors and you have more
+    // than 1 producer thread
+    // have a look at sched_yield (POSIX.1b)
+    sched_yield();
+  }
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+  AtomicAdd(&m_count, 1);
+#endif
+  return true;
 }
 
 
-template <typename ELEM_T, typename Allocator>
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
-{
-    uint32_t currentMaximumReadIndex;
-    uint32_t currentReadIndex;
+template<typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) {
+  uint32_t currentMaximumReadIndex;
+  uint32_t currentReadIndex;
 
-    do
-    {
-        // to ensure thread-safety when there is more than 1 producer thread
-        // a second index is defined (m_maximumReadIndex)
-        currentReadIndex        = m_readIndex;
-        currentMaximumReadIndex = m_maximumReadIndex;
-    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+  do {
+    // to ensure thread-safety when there is more than 1 producer thread
+    // a second index is defined (m_maximumReadIndex)
+    currentReadIndex = m_readIndex;
+    currentMaximumReadIndex = m_maximumReadIndex;
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-        if (m_count == 0) {
-            return false;
-        }
-    #else
-        if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
-        {
-            // the queue is empty or
-            // a producer thread has allocate space in the queue but is
-            // waiting to commit the data into it
-            return false;
-        }
-    #endif
-
-        // retrieve the data from the queue
-        a_data = m_theQueue[countToIndex(currentReadIndex)];
-
-        // try to perfrom now the CAS operation on the read index. If we succeed
-        // a_data already contains what m_readIndex pointed to before we
-        // increased it
-        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
-        {
-        #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-            // m_count.fetch_sub(1);
-            AtomicSub(&m_count, 1);
-        #endif
-            return true;
-        }
-
-        // it failed retrieving the element off the queue. Someone else must
-        // have read the element stored at countToIndex(currentReadIndex)
-        // before we could perform the CAS operation
-
-    } while(1); // keep looping to try again!
-
-    // Something went wrong. it shouldn't be possible to reach here
-    assert(0);
-
-    // Add this return statement to avoid compiler warnings
-    return false;
-}
-
-template <typename ELEM_T, typename Allocator>
-ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
-{
-    int currentCount = m_count;
-    uint32_t currentReadIndex = m_readIndex;
-    if (i >= currentCount)
-    {
-        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
-        std::exit(EXIT_FAILURE);
+    if (m_count == 0) {
+      return false;
     }
-    return m_theQueue[countToIndex(currentReadIndex+i)];
+#else
+    if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
+    {
+        // the queue is empty or
+        // a producer thread has allocate space in the queue but is
+        // waiting to commit the data into it
+        return false;
+    }
+#endif
+
+    // retrieve the data from the queue
+    a_data = m_theQueue[countToIndex(currentReadIndex)];
+
+    // try to perfrom now the CAS operation on the read index. If we succeed
+    // a_data already contains what m_readIndex pointed to before we
+    // increased it
+    if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+      // m_count.fetch_sub(1);
+      AtomicSub(&m_count, 1);
+#endif
+      return true;
+    }
+
+    // it failed retrieving the element off the queue. Someone else must
+    // have read the element stored at countToIndex(currentReadIndex)
+    // before we could perform the CAS operation
+
+  } while (1); // keep looping to try again!
+
+  // Something went wrong. it shouldn't be possible to reach here
+  assert(0);
+
+  // Add this return statement to avoid compiler warnings
+  return false;
+}
+
+template<typename ELEM_T, typename Allocator>
+ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) {
+  int currentCount = m_count;
+  uint32_t currentReadIndex = m_readIndex;
+  if (i >= currentCount) {
+    std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i
+              << " is out of range\n";
+    std::exit(EXIT_FAILURE);
+  }
+  return m_theQueue[countToIndex(currentReadIndex + i)];
 }
 
 #endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__

--
Gitblit v1.8.0