From 5c912c70e9333298ff48f7ea15424f72ca977b99 Mon Sep 17 00:00:00 2001
From: Fu Juntang <StrongTiger_001@163.com>
Date: 星期五, 17 九月 2021 09:43:55 +0800
Subject: [PATCH] Add the heartbeat logic feature.

---
 src/queue/array_lock_free_sem_queue.h |   80 ++++++++++++++++++++++++---------------
 1 files changed, 49 insertions(+), 31 deletions(-)

diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h
index 69630d9..cf1266d 100644
--- a/src/queue/array_lock_free_sem_queue.h
+++ b/src/queue/array_lock_free_sem_queue.h
@@ -4,11 +4,11 @@
 #include <assert.h> // assert()
 #include <sched.h>  // sched_yield()
 #include "logger_factory.h"
-#include "mem_pool.h"
+#include "shm_mm.h"
 #include "shm_allocator.h"
 #include "futex_sem.h"
 #include "time_util.h"
-
+#include "bus_def.h"
 
 /// @brief implementation of an array based lock free queue with support for
 ///        multiple producers
@@ -17,8 +17,7 @@
 /// you must use the ArrayLockFreeSemQueue fachade:
 ///   ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q;
 
-#define LOCK_FREE_QUEUE_TIMEOUT  1
-#define LOCK_FREE_QUEUE_NOWAIT  1 << 1
+ 
 
 #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
@@ -205,50 +204,54 @@
 {
   uint32_t currentReadIndex;
   uint32_t currentWriteIndex;
+  uint32_t tmpIndex;
   int s;
 
+  // sigset_t mask_all, pre;
+  // sigfillset(&mask_all);
   do
   {
     currentWriteIndex = m_writeIndex;
     currentReadIndex  = m_readIndex;
   #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
     if (m_count == Q_SIZE) {
-      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
-        return -1;
-      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
+        return errno;
+      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
         const struct timespec ts = TimeUtil::trim_time(timeout);
         s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
         if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
           // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
-          return -1;
+          return errno;
         }
             
       } else {
         s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
         if (s == -1 && errno != EAGAIN && errno != EINTR) {
-          return -1;
+          return errno;
         }
       }
 
     }
   #else
-    if (currentReadIndex == currentWriteIndex - Q_SIZE  + 1   )
+    tmpIndex = (uint32_t)(currentWriteIndex - Q_SIZE  + 1);
+    if (currentReadIndex ==   tmpIndex )
     {
         // the queue is full
-      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
-        return -1;
-      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
-        const struct timespec ts = TimeUtil::trim_time(timeout);
-        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0);
+      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
+        return errno;
+      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+        
+        s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, timeout, NULL, 0);
         if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
           // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
-          return -1;
+          return errno;
         }
             
       } else {
-        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, NULL, NULL, 0);
+        s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, NULL, NULL, 0);
         if (s == -1 && errno != EAGAIN && errno != EINTR) {
-          return -1;
+          return errno;
         }
       }
     }
@@ -260,10 +263,11 @@
   // We know now that this index is reserved for us. Use it to save the data
   m_theQueue[countToIndex(currentWriteIndex)] = a_data;
 
+  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
+
   // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
   // inserting in the queue. It might fail if there are more than 1 producer threads because this
   // operation has to be done in the same order as the previous CAS
-
   while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
   {
     // this is a good place to yield the thread in case there are more
@@ -283,6 +287,7 @@
       err_exit(errno, "futex-FUTEX_WAKE");  
 #endif
 
+  // sigprocmask(SIG_SETMASK, &pre, NULL);
   return 0;
 }
 
@@ -293,6 +298,11 @@
   uint32_t currentMaximumReadIndex;
   uint32_t currentReadIndex;
   int s;
+  
+  // sigset_t mask_all, pre;
+  // sigfillset(&mask_all);
+
+  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
 
   do
   {
@@ -305,20 +315,23 @@
 
     if (m_count == 0) {
 
-      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
-        return -1;
-      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
-        const struct timespec ts = TimeUtil::trim_time(timeout);
-        s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
+      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+        // sigprocmask(SIG_SETMASK, &pre, NULL);
+        return errno;
+      }
+      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+        s = futex((int *)&m_count, FUTEX_WAIT, 0, timeout, NULL, 0);
         if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
           // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
-          return -1;
+          // sigprocmask(SIG_SETMASK, &pre, NULL);
+          return errno;
         }
             
       } else {
         s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
         if (s == -1 && errno != EAGAIN && errno != EINTR) {
-          return -1;
+          // sigprocmask(SIG_SETMASK, &pre, NULL);
+          return errno;
         }
       }
     }
@@ -330,20 +343,24 @@
       // the queue is empty or
       // a producer thread has allocate space in the queue but is
       // waiting to commit the data into it
-      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
-        return -1;
-      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+        // sigprocmask(SIG_SETMASK, &pre, NULL);
+        return errno;
+      }
+      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
         const struct timespec ts = TimeUtil::trim_time(timeout);
         s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0);
         if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
           // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
-          return -1;
+          // sigprocmask(SIG_SETMASK, &pre, NULL);
+          return errno;
         }
             
       } else {
         s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0);
         if (s == -1 && errno != EAGAIN && errno != EINTR) {
-          return -1;
+         // sigprocmask(SIG_SETMASK, &pre, NULL);
+          return errno;
         }
       }
     }
@@ -367,6 +384,7 @@
         err_exit(errno, "futex-FUTEX_WAKE");
     #endif
      
+      // sigprocmask(SIG_SETMASK, &pre, NULL);
       return 0;
     }
 

--
Gitblit v1.8.0