From 09a82c2ece4caadad0baa0d1f3b84f1506363fdd Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 22 一月 2021 11:58:33 +0800
Subject: [PATCH] update

---
 src/queue/array_lock_free_sem_queue.h |  104 +++++++++++++++++++++++++++++++++++++---------------
 1 files changed, 74 insertions(+), 30 deletions(-)

diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h
index bb213e8..69630d9 100644
--- a/src/queue/array_lock_free_sem_queue.h
+++ b/src/queue/array_lock_free_sem_queue.h
@@ -75,7 +75,7 @@
 
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
   /// @brief number of elements in the queue
-  int m_count;
+  uint32_t m_count;
 #endif
 
 
@@ -200,10 +200,6 @@
 }
 
 
-
-
-
-
   template <typename ELEM_T, typename Allocator>
 int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data,  const struct timespec *timeout, int flag)
 {
@@ -215,28 +211,50 @@
   {
     currentWriteIndex = m_writeIndex;
     currentReadIndex  = m_readIndex;
-
+  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
     if (m_count == Q_SIZE) {
       if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
         return -1;
       else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
         const struct timespec ts = TimeUtil::trim_time(timeout);
-        s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
+        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
         if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
           // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
           return -1;
         }
             
       } else {
-        s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
+        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
         if (s == -1 && errno != EAGAIN && errno != EINTR) {
           return -1;
         }
       }
 
     }
+  #else
+    if (currentReadIndex == currentWriteIndex - Q_SIZE  + 1   )
+    {
+        // the queue is full
+      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+        return -1;
+      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+        const struct timespec ts = TimeUtil::trim_time(timeout);
+        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0);
+        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
+          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+          return -1;
+        }
+            
+      } else {
+        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, NULL, NULL, 0);
+        if (s == -1 && errno != EAGAIN && errno != EINTR) {
+          return -1;
+        }
+      }
+    }
+  #endif
 
-
+    //淇濈暀鍐欏叆浣�
   } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
 
   // We know now that this index is reserved for us. Use it to save the data
@@ -255,10 +273,16 @@
     sched_yield();
   }
 
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
   AtomicAdd(&m_count, 1);
-  s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
-  if (s  == -1)
+
+  if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1)
       err_exit(errno, "futex-FUTEX_WAKE");
+#else
+  if ( (s = futex((int *)&m_maximumReadIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1)
+      err_exit(errno, "futex-FUTEX_WAKE");  
+#endif
+
   return 0;
 }
 
@@ -268,15 +292,16 @@
 {
   uint32_t currentMaximumReadIndex;
   uint32_t currentReadIndex;
-
   int s;
+
   do
   {
     // to ensure thread-safety when there is more than 1 producer thread
     // a second index is defined (m_maximumReadIndex)
     currentReadIndex        = m_readIndex;
     currentMaximumReadIndex = m_maximumReadIndex;
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
     if (m_count == 0) {
 
@@ -284,28 +309,45 @@
         return -1;
       else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
         const struct timespec ts = TimeUtil::trim_time(timeout);
-        s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
+        s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
         if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
           // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
           return -1;
         }
             
       } else {
-        s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
+        s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
         if (s == -1 && errno != EAGAIN && errno != EINTR) {
           return -1;
         }
       }
     }
-#else
-    if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
+    
+  #else
+
+    if (currentReadIndex == currentMaximumReadIndex)
     {
       // the queue is empty or
       // a producer thread has allocate space in the queue but is
       // waiting to commit the data into it
-      return -1;
+      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+        return -1;
+      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+        const struct timespec ts = TimeUtil::trim_time(timeout);
+        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0);
+        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
+          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+          return -1;
+        }
+            
+      } else {
+        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0);
+        if (s == -1 && errno != EAGAIN && errno != EINTR) {
+          return -1;
+        }
+      }
     }
-#endif
+  #endif
 
     // retrieve the data from the queue
     a_data = m_theQueue[countToIndex(currentReadIndex)];
@@ -315,14 +357,16 @@
     // increased it
     if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
     {
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
       // m_count.fetch_sub(1);
       AtomicSub(&m_count, 1);
-#endif
-
-      s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
-      if (s  == -1)
+      if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1)
         err_exit(errno, "futex-FUTEX_WAKE");
+    #else
+      if ( (s = futex((int *)&m_readIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1)
+        err_exit(errno, "futex-FUTEX_WAKE");
+    #endif
+     
       return 0;
     }
 
@@ -342,13 +386,13 @@
   template <typename ELEM_T, typename Allocator>
 ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i)
 {
-  int currentCount = m_count;
+  // int currentCount = m_count;
   uint32_t currentReadIndex = m_readIndex;
-  if (i >= currentCount)
-  {
-    std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
-    std::exit(EXIT_FAILURE);
-  }
+  // if (i >= currentCount)
+  // {
+  //   std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
+  //   std::exit(EXIT_FAILURE);
+  // }
   return m_theQueue[countToIndex(currentReadIndex+i)];
 }
 

--
Gitblit v1.8.0