From 2c65db46500207f8445aa4baa53bfbb6602e0e18 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 21 一月 2021 16:37:03 +0800
Subject: [PATCH] restructure

---
 src/queue/shm_queue.h |  123 +++++++++++++++++++++++++----------------
 1 files changed, 75 insertions(+), 48 deletions(-)

diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 7d98eaa..5d2d9b6 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -6,12 +6,13 @@
 #define __SHM_QUEUE_H__
 
 #include "hashtable.h"
-#include "lock_free_queue.h"
+ 
 #include "logger_factory.h"
 #include "sem_util.h"
 #include "shm_allocator.h"
 #include "usg_common.h"
-
+#include "array_lock_free_sem_queue.h"
+#include "bus_error.h"
 
 template <typename ELEM_T> class SHMQueue {
 
@@ -20,7 +21,7 @@
 
 public:
   /// @brief constructor of the class
-  SHMQueue(int key = 0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
+  SHMQueue(int key = 0, size_t qsize = 16);
 
   ~SHMQueue();
 
@@ -49,7 +50,8 @@
 protected:
   /// @brief the actual queue-> methods are forwarded into the real
   ///        implementation
-  LockFreeQueue<ELEM_T, SHM_Allocator> *queue;
+
+  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *queue;
 
 private:
   /// @brief disable copy constructor declaring it private
@@ -62,7 +64,7 @@
   hashtable_t *hashtable = mm_get_hashtable();
   std::set<int> *keyset = hashtable_keyset(hashtable);
   std::set<int>::iterator keyItr;
-  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
+  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
   bool found;
   size_t count = 0;
   for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
@@ -75,7 +77,7 @@
     }
     if (!found) {
       // 閿�姣佸叡浜唴瀛樼殑queue
-      mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
+      mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
       delete mqueue;
       hashtable_remove(hashtable, *keyItr);
       count++;
@@ -89,11 +91,11 @@
 template <typename ELEM_T>
 size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) {
   hashtable_t *hashtable = mm_get_hashtable();
-  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
+  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
   size_t count = 0;
   for(int i = 0; i< length; i++) {
     // 閿�姣佸叡浜唴瀛樼殑queue
-    mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
+    mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
     delete mqueue;
     hashtable_remove(hashtable, keys[i]);
     count++;
@@ -111,49 +113,22 @@
 SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) {
 
   hashtable_t *hashtable = mm_get_hashtable();
-  queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
+  queue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
   if (queue == NULL || (void *)queue == (void *)1) {
-    queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
+    queue = new ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator>(qsize);
     hashtable_put(hashtable, key, (void *)queue);
   }
-  queue->reference++;
+  // queue->reference++;
   // LoggerFactory::getLogger()->debug("SHMQueue constructor reference===%d", queue->reference.load());
 }
 
 template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() {
-  if(queue == NULL) {
-     // queue宸茬粡閿�姣�
-    return;
-  }
-
-  sem_wait(&(queue->mutex));
-  queue->reference--;
-  // LoggerFactory::getLogger()->debug("SHMQueue destructor  reference===%d",
-  if (queue->reference.load() == 0) {
-      delete queue;
-      queue = NULL;
-      hashtable_t *hashtable = mm_get_hashtable();
-      hashtable_remove(hashtable, KEY);
-      // 姝ゆ椂queue宸茬粡閿�姣侊紝鏃犻渶  sem_post(&(queue->mutex))
-      // printf("SHMQueue destructor delete queue\n");
-  } else {
-      sem_post(&(queue->mutex));
-  }
-  
-}
-
-template <typename ELEM_T> void SHMQueue<ELEM_T>::force_destroy() {
-  if(queue == NULL) {
-    // queue宸茬粡閿�姣�
-    return;
-  }
-
-  SemUtil::dec(queue->mutex);
+  LoggerFactory::getLogger()->debug("SHMQueue destroy");
   delete queue;
   queue = NULL;
   hashtable_t *hashtable = mm_get_hashtable();
   hashtable_remove(hashtable, KEY);
-  // 姝ゆ椂queue宸茬粡閿�姣侊紝鏃犻渶 SemUtil::inc(queue->mutex)
+  
 }
 
 template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::size() {
@@ -170,36 +145,85 @@
 
 template <typename ELEM_T>
 inline int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) {
-  return queue->push(a_data);
+  int rv = queue->push(a_data);
+  if(rv == -1) {
+    return errno;
+  } else {
+    return 0;
+  }
 }
 
 template <typename ELEM_T>
 inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
-  return queue->push_nowait(a_data);
+  int rv =  queue->push(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
+  if(rv == -1) {
+    if (errno == EAGAIN)
+      return EAGAIN;
+    else {
+        err_msg(errno, "LockFreeQueue push_nowait");
+        return errno;
+    }
+  }
+  return 0;
 }
 
 template <typename ELEM_T>
-inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data,
-                                           const struct timespec *timeout) {
+inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) {
 
-  return queue->push_timeout(a_data, timeout);
+  int rv = queue->push(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
+  if(rv == -1) {
+    if(errno == ETIMEDOUT)
+        return EBUS_TIMEOUT;
+    else {
+       LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
+       return errno;
+    }
+  }
+  return 0;
 }
 
 template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) {
   // printf("SHMQueue pop before\n");
   int rv = queue->pop(a_data);
   // printf("SHMQueue after before\n");
-  return rv;
+  if(rv == -1) {
+    return errno;
+  } else {
+    return 0;
+  }
 }
 
 template <typename ELEM_T>
 inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
-  return queue->pop_nowait(a_data);
+  int rv = queue->pop(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
+
+  if(rv == -1) {
+    if (errno == EAGAIN)
+      return errno;
+    else {
+        LoggerFactory::getLogger()->error(errno, " SHMQueue pop_nowait");
+        return errno;
+    }
+  }
+  return 0;
+  
 }
 
 template <typename ELEM_T>
 inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) {
-  return queue->pop_timeout(a_data, timeout);
+
+  int rv;
+  rv = queue->pop(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
+  if(rv == -1) {
+    if (errno == ETIMEDOUT) {
+      return EBUS_TIMEOUT;
+    } else {
+      LoggerFactory::getLogger()->error(errno, " SHMQueue pop_timeout");
+      return errno;
+    }
+  }
+  return 0;
+  
 }
 
 template <typename ELEM_T>
@@ -207,4 +231,7 @@
   return queue->operator[](i);
 }
 
+
+
+
 #endif

--
Gitblit v1.8.0