From 489be34b462204071d81ba86644269e88e440bf5 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期四, 30 九月 2021 13:31:15 +0800
Subject: [PATCH] fix SendReply free
---
src/queue/lock_free_queue.h | 79 +++++++++++++++++++++++++++------------
1 files changed, 54 insertions(+), 25 deletions(-)
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 425d9f8..6363ccb 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -6,7 +6,7 @@
#include <usg_common.h>
#include <assert.h> // assert()
-#include "mem_pool.h"
+#include "shm_mm.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "shm_allocator.h"
@@ -15,7 +15,12 @@
#include "bus_def.h"
// default Queue size
-#define LOCK_FREE_Q_DEFAULT_SIZE 16
+#define LOCK_FREE_Q_DEFAULT_SIZE 320
+
+
+#define LOCK_FREE_Q_ST_OPENED 0
+
+#define LOCK_FREE_Q_ST_CLOSED 1
// static Logger *logger = LoggerFactory::getLogger();
// define this macro if calls to "size" must return the real size of the
@@ -84,10 +89,10 @@
sem_t items;
time_t createTime;
+ time_t closeTime;
+ int status;
public:
- // sem_t mutex;
-
LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
@@ -95,6 +100,9 @@
/// Note it is not virtual since it is not expected to inherit from this
/// template
~LockFreeQueue();
+
+ inline void close();
+ inline bool isClosed();
// std::atomic_uint reference;
/// @brief constructor of the class
@@ -120,8 +128,18 @@
inline ELEM_T &operator[](unsigned i);
+
+
time_t getCreateTime() {
return createTime;
+ }
+
+ time_t getCloseTime() {
+ return closeTime;
+ }
+
+ int getStatus() {
+ return status;
}
/// @brief push an element at the tail of the queue
@@ -159,13 +177,14 @@
typename Allocator,
template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) {
- // std::cout << "LockFreeQueue init reference=" << reference << std::endl;
+ //std::cout << "LockFreeQueue init reference=" << reference << std::endl;
if (sem_init(&slots, 1, qsize) == -1)
err_exit(errno, "LockFreeQueue sem_init");
if (sem_init(&items, 1, 0) == -1)
err_exit(errno, "LockFreeQueue sem_init");
createTime = time(NULL);
+ status = LOCK_FREE_Q_ST_OPENED;
}
@@ -174,17 +193,32 @@
typename ELEM_T,
typename Allocator,
template<typename T, typename AT> class Q_TYPE>
+inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() {
+ status = LOCK_FREE_Q_ST_CLOSED;
+ closeTime = time(NULL);
+}
+
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::isClosed() {
+ return status == LOCK_FREE_Q_ST_CLOSED;
+}
+
+
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
- // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
if (sem_destroy(&slots) == -1) {
err_exit(errno, "LockFreeQueue sem_destroy");
}
if (sem_destroy(&items) == -1) {
err_exit(errno, "LockFreeQueue sem_destroy");
}
- // if (sem_destroy(&mutex) == -1) {
- // err_exit(errno, "LockFreeQueue sem_destroy");
- // }
+
}
template<
@@ -216,11 +250,10 @@
typename Allocator,
template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
- // LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
- // sigset_t mask_all, pre;
- // sigfillset(&mask_all);
+ sigset_t mask_all, pre;
+ sigfillset(&mask_all);
- // sigprocmask(SIG_BLOCK, &mask_all, &pre);
+ sigprocmask(SIG_BLOCK, &mask_all, &pre);
if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
if (psem_trywait(&slots) == -1) {
@@ -239,13 +272,12 @@
if (m_qImpl.push(a_data)) {
psem_post(&items);
- // sigprocmask(SIG_SETMASK, &pre, NULL);
- // LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
+ sigprocmask(SIG_SETMASK, &pre, NULL);
return 0;
}
LABEL_FAILTURE:
- // sigprocmask(SIG_SETMASK, &pre, NULL);
+ sigprocmask(SIG_SETMASK, &pre, NULL);
return errno;
}
@@ -253,20 +285,18 @@
typename Allocator,
template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
- // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before....");
- // sigset_t mask_all, pre;
- // sigfillset(&mask_all);
+ sigset_t mask_all, pre;
+ sigfillset(&mask_all);
- // sigprocmask(SIG_BLOCK, &mask_all, &pre);
+ sigprocmask(SIG_BLOCK, &mask_all, &pre);
if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
if (psem_trywait(&items) == -1) {
goto LABEL_FAILTURE;
}
} else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
- // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before. flag=%d , %d\n", flag, timeout->tv_sec);
- if (psem_timedwait(&items, timeout) == -1) {
+ if (psem_timedwait(&items, timeout) == -1) {
goto LABEL_FAILTURE;
}
} else {
@@ -277,14 +307,13 @@
if (m_qImpl.pop(a_data)) {
psem_post(&slots);
- // sigprocmask(SIG_SETMASK, &pre, NULL);
- // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
+ sigprocmask(SIG_SETMASK, &pre, NULL);
return 0;
}
LABEL_FAILTURE:
- // sigprocmask(SIG_SETMASK, &pre, NULL);
+ sigprocmask(SIG_SETMASK, &pre, NULL);
return errno;
}
--
Gitblit v1.8.0