From 14be935a4f8231233487d510c8db0b544bcf0f69 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 25 一月 2021 17:40:29 +0800
Subject: [PATCH] fix conflict

---
 src/queue/array_lock_free_sem_queue.h |  429 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 429 insertions(+), 0 deletions(-)

diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h
new file mode 100644
index 0000000..28f4d81
--- /dev/null
+++ b/src/queue/array_lock_free_sem_queue.h
@@ -0,0 +1,429 @@
+#ifndef __ARRAY_LOCK_FREE_SEM_QUEUE_H__
+#define __ARRAY_LOCK_FREE_SEM_QUEUE_H__
+#include "atomic_ops.h"
+#include <assert.h> // assert()
+#include <sched.h>  // sched_yield()
+#include "logger_factory.h"
+#include "mem_pool.h"
+#include "shm_allocator.h"
+#include "futex_sem.h"
+#include "time_util.h"
+#include "bus_def.h"
+
+/// @brief implementation of an array based lock free queue with support for
+///        multiple producers
+/// This class is prevented from being instantiated directly (all members and
+/// methods are private). To instantiate a multiple producers lock free queue
+/// you must use the ArrayLockFreeSemQueue fachade:
+///   ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q;
+
+ 
+
+#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+
+template <typename ELEM_T, typename Allocator = SHM_Allocator>
+class ArrayLockFreeSemQueue
+{
+public:
+  /// @brief constructor of the class
+  ArrayLockFreeSemQueue(size_t qsize = 16);
+
+  virtual ~ArrayLockFreeSemQueue();
+
+  inline uint32_t size();
+
+  inline bool full();
+
+  inline bool empty();
+
+  int push(const ELEM_T &a_data,  const struct timespec *timeout = NULL, int flag = 0);
+
+  int pop(ELEM_T &a_data,  const struct timespec *timeout = NULL, int flag = 0);
+
+  /// @brief calculate the index in the circular array that corresponds
+  /// to a particular "count" value
+  inline uint32_t countToIndex(uint32_t a_count);
+
+  ELEM_T& operator[](unsigned i);
+
+public:
+  void *operator new(size_t size);
+  void operator delete(void *p);
+  
+private:
+  size_t Q_SIZE;
+  /// @brief array to keep the elements
+  ELEM_T *m_theQueue;
+
+  /// @brief where a new element will be inserted
+  uint32_t m_writeIndex;
+
+  /// @brief where the next element where be extracted from
+  uint32_t m_readIndex;
+
+  /// @brief maximum read index for multiple producer queues
+  /// If it's not the same as m_writeIndex it means
+  /// there are writes pending to be "committed" to the queue, that means,
+  /// the place for the data was reserved (the index in the array) but
+  /// data is still not in the queue, so the thread trying to read will have
+  /// to wait for those other threads to save the data into the queue
+  ///
+  /// note this is only used for multiple producers
+  uint32_t m_maximumReadIndex;
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+  /// @brief number of elements in the queue
+  uint32_t m_count;
+#endif
+
+
+  private:
+  /// @brief disable copy constructor declaring it private
+  ArrayLockFreeSemQueue<ELEM_T, Allocator>(const ArrayLockFreeSemQueue<ELEM_T> &a_src);
+
+};
+
+
+template <typename ELEM_T, typename Allocator>
+ArrayLockFreeSemQueue<ELEM_T, Allocator>::ArrayLockFreeSemQueue(size_t qsize):
+  Q_SIZE(qsize),
+  m_writeIndex(0),      // initialisation is not atomic
+  m_readIndex(0),       //
+  m_maximumReadIndex(0) //
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+  ,m_count(0)           //
+#endif
+{
+  m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
+
+}
+
+  template <typename ELEM_T, typename Allocator>
+ArrayLockFreeSemQueue<ELEM_T, Allocator>::~ArrayLockFreeSemQueue()
+{
+  // std::cout << "destroy ArrayLockFreeSemQueue\n";
+  Allocator::deallocate(m_theQueue);
+
+}
+
+template <typename ELEM_T, typename Allocator>
+  inline
+uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
+{
+  // if Q_SIZE is a power of 2 this statement could be also written as
+  // return (a_count & (Q_SIZE - 1));
+  return (a_count % Q_SIZE);
+}
+
+template <typename ELEM_T, typename Allocator>
+  inline
+uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::size()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+  return m_count;
+#else
+
+  uint32_t currentWriteIndex = m_maximumReadIndex;
+  uint32_t currentReadIndex  = m_readIndex;
+
+  // let's think of a scenario where this function returns bogus data
+  // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
+  // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
+  // 2. afterwards this thread is preemted. While this thread is inactive 2
+  // elements are inserted and removed from the queue, so m_maximumReadIndex
+  // is 5 and m_readIndex 4. Real size is still 1
+  // 3. Now the current thread comes back from preemption and reads m_readIndex.
+  // currentReadIndex is 4
+  // 4. currentReadIndex is bigger than currentWriteIndex, so
+  // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+  // it returns that the queue is almost full, when it is almost empty
+  //
+  if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
+  {
+    return (currentWriteIndex - currentReadIndex);
+  }
+  else
+  {
+    return (Q_SIZE + currentWriteIndex - currentReadIndex);
+  }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, typename Allocator>
+  inline
+bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::full()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+  return (m_count == (Q_SIZE));
+#else
+
+  uint32_t currentWriteIndex = m_writeIndex;
+  uint32_t currentReadIndex  = m_readIndex;
+
+  if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+  {
+    // the queue is full
+    return true;
+  }
+  else
+  {
+    // not full!
+    return false;
+  }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, typename Allocator>
+  inline
+bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::empty()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+  return (m_count == 0);
+#else
+
+  if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
+  {
+    // the queue is full
+    return true;
+  }
+  else
+  {
+    // not full!
+    return false;
+  }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+
+  template <typename ELEM_T, typename Allocator>
+int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data,  const struct timespec *timeout, int flag)
+{
+  uint32_t currentReadIndex;
+  uint32_t currentWriteIndex;
+  uint32_t tmpIndex;
+  int s;
+
+  // sigset_t mask_all, pre;
+  // sigfillset(&mask_all);
+  do
+  {
+    currentWriteIndex = m_writeIndex;
+    currentReadIndex  = m_readIndex;
+  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+    if (m_count == Q_SIZE) {
+      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
+        return errno;
+      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+        const struct timespec ts = TimeUtil::trim_time(timeout);
+        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
+        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
+          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+          return errno;
+        }
+            
+      } else {
+        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
+        if (s == -1 && errno != EAGAIN && errno != EINTR) {
+          return errno;
+        }
+      }
+
+    }
+  #else
+    tmpIndex = (uint32_t)(currentWriteIndex - Q_SIZE  + 1);
+    if (currentReadIndex ==   tmpIndex )
+    {
+        // the queue is full
+      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
+        return errno;
+      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+        
+        s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, timeout, NULL, 0);
+        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
+          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+          return errno;
+        }
+            
+      } else {
+        s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, NULL, NULL, 0);
+        if (s == -1 && errno != EAGAIN && errno != EINTR) {
+          return errno;
+        }
+      }
+    }
+  #endif
+
+    //淇濈暀鍐欏叆浣�
+  } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+  // We know now that this index is reserved for us. Use it to save the data
+  m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+
+  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
+
+  // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
+  // inserting in the queue. It might fail if there are more than 1 producer threads because this
+  // operation has to be done in the same order as the previous CAS
+  while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
+  {
+    // this is a good place to yield the thread in case there are more
+    // software threads than hardware processors and you have more
+    // than 1 producer thread
+    // have a look at sched_yield (POSIX.1b)
+    sched_yield();
+  }
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+  AtomicAdd(&m_count, 1);
+
+  if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1)
+      err_exit(errno, "futex-FUTEX_WAKE");
+#else
+  if ( (s = futex((int *)&m_maximumReadIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1)
+      err_exit(errno, "futex-FUTEX_WAKE");  
+#endif
+
+  // sigprocmask(SIG_SETMASK, &pre, NULL);
+  return 0;
+}
+
+
+  template <typename ELEM_T, typename Allocator>
+int ArrayLockFreeSemQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag)
+{
+  uint32_t currentMaximumReadIndex;
+  uint32_t currentReadIndex;
+  int s;
+  
+  // sigset_t mask_all, pre;
+  // sigfillset(&mask_all);
+
+  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
+
+  do
+  {
+    // to ensure thread-safety when there is more than 1 producer thread
+    // a second index is defined (m_maximumReadIndex)
+    currentReadIndex        = m_readIndex;
+    currentMaximumReadIndex = m_maximumReadIndex;
+
+  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+    if (m_count == 0) {
+
+      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+        // sigprocmask(SIG_SETMASK, &pre, NULL);
+        return errno;
+      }
+      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+        s = futex((int *)&m_count, FUTEX_WAIT, 0, timeout, NULL, 0);
+        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
+          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+          // sigprocmask(SIG_SETMASK, &pre, NULL);
+          return errno;
+        }
+            
+      } else {
+        s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
+        if (s == -1 && errno != EAGAIN && errno != EINTR) {
+          // sigprocmask(SIG_SETMASK, &pre, NULL);
+          return errno;
+        }
+      }
+    }
+    
+  #else
+
+    if (currentReadIndex == currentMaximumReadIndex)
+    {
+      // the queue is empty or
+      // a producer thread has allocate space in the queue but is
+      // waiting to commit the data into it
+      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+        // sigprocmask(SIG_SETMASK, &pre, NULL);
+        return errno;
+      }
+      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+        const struct timespec ts = TimeUtil::trim_time(timeout);
+        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0);
+        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
+          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+          // sigprocmask(SIG_SETMASK, &pre, NULL);
+          return errno;
+        }
+            
+      } else {
+        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0);
+        if (s == -1 && errno != EAGAIN && errno != EINTR) {
+         // sigprocmask(SIG_SETMASK, &pre, NULL);
+          return errno;
+        }
+      }
+    }
+  #endif
+
+    // retrieve the data from the queue
+    a_data = m_theQueue[countToIndex(currentReadIndex)];
+
+    // try to perfrom now the CAS operation on the read index. If we succeed
+    // a_data already contains what m_readIndex pointed to before we
+    // increased it
+    if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
+    {
+    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+      // m_count.fetch_sub(1);
+      AtomicSub(&m_count, 1);
+      if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1)
+        err_exit(errno, "futex-FUTEX_WAKE");
+    #else
+      if ( (s = futex((int *)&m_readIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1)
+        err_exit(errno, "futex-FUTEX_WAKE");
+    #endif
+     
+      // sigprocmask(SIG_SETMASK, &pre, NULL);
+      return 0;
+    }
+
+    // it failed retrieving the element off the queue. Someone else must
+    // have read the element stored at countToIndex(currentReadIndex)
+    // before we could perform the CAS operation
+
+  } while(1); // keep looping to try again!
+
+  // Something went wrong. it shouldn't be possible to reach here
+  assert(0);
+
+  // Add this return statement to avoid compiler warnings
+  return -1;
+}
+
+  template <typename ELEM_T, typename Allocator>
+ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i)
+{
+  // int currentCount = m_count;
+  uint32_t currentReadIndex = m_readIndex;
+  // if (i >= currentCount)
+  // {
+  //   std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
+  //   std::exit(EXIT_FAILURE);
+  // }
+  return m_theQueue[countToIndex(currentReadIndex+i)];
+}
+
+
+
+template <typename ELEM_T, typename Allocator>
+void * ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator new(size_t size){
+        return Allocator::allocate(size);
+}
+
+template <typename ELEM_T, typename Allocator>
+void ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator delete(void *p) {
+    return Allocator::deallocate(p);
+}
+
+#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__

--
Gitblit v1.8.0