From e0aea3742aed09a0a9ed384ccd7db203b6efc650 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期六, 20 二月 2021 14:43:52 +0800
Subject: [PATCH] update
---
src/key_def.h | 3
src/shm/shm_mm_wrapper.h | 4
src/queue/lock_free_queue.h | 36 ++++
test_net_socket/test_net_mod_socket.cpp | 8 +
src/shm/shm_mm_wrapper.cpp | 34 ++++
src/socket/shm_socket.cpp | 76 +++++++---
/dev/null | 63 ---------
src/shm/shm_mm.cpp | 60 ++++++++
src/queue/shm_queue.h | 31 ----
src/socket/bus_server_socket.cpp | 10
src/shm/shm_mm.h | 37 +++++
src/shm/shm_allocator.h | 4
src/socket/shm_mod_socket.cpp | 9 -
13 files changed, 235 insertions(+), 140 deletions(-)
diff --git a/src/key_def.h b/src/key_def.h
index 4f0e1c0..904b78f 100644
--- a/src/key_def.h
+++ b/src/key_def.h
@@ -2,6 +2,9 @@
#define _KEY_DEF_H_
#define SHM_BUS_MAP_KEY 1
+
+
+#define SHM_QUEUE_ST_KEY 3
// BUS key
#define SHM_BUS_KEY 8
// 缃戠粶浠g悊key
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 425d9f8..d66ee8c 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -17,6 +17,11 @@
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
+
+#define LOCK_FREE_Q_ST_OPENED 0
+
+#define LOCK_FREE_Q_ST_CLOSED 1
+
// static Logger *logger = LoggerFactory::getLogger();
// define this macro if calls to "size" must return the real size of the
// queue. If it is undefined that function will try to take a snapshot of
@@ -84,10 +89,10 @@
sem_t items;
time_t createTime;
+ time_t closeTime;
+ int status;
public:
- // sem_t mutex;
-
LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
@@ -95,6 +100,8 @@
/// Note it is not virtual since it is not expected to inherit from this
/// template
~LockFreeQueue();
+
+ inline void close();
// std::atomic_uint reference;
/// @brief constructor of the class
@@ -120,8 +127,18 @@
inline ELEM_T &operator[](unsigned i);
+
+
time_t getCreateTime() {
return createTime;
+ }
+
+ time_t getCloseTime() {
+ return closeTime;
+ }
+
+ int getStatus() {
+ return status;
}
/// @brief push an element at the tail of the queue
@@ -166,7 +183,18 @@
err_exit(errno, "LockFreeQueue sem_init");
createTime = time(NULL);
+ status = LOCK_FREE_Q_ST_OPENED;
+}
+
+
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() {
+ status = LOCK_FREE_Q_ST_CLOSED;
+ closeTime = time(NULL);
}
@@ -182,9 +210,7 @@
if (sem_destroy(&items) == -1) {
err_exit(errno, "LockFreeQueue sem_destroy");
}
- // if (sem_destroy(&mutex) == -1) {
- // err_exit(errno, "LockFreeQueue sem_destroy");
- // }
+
}
template<
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 24a4dfc..7893485 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -45,8 +45,6 @@
ELEM_T &operator[](unsigned i);
- // @deprecate
- static size_t remove_queues_exclude(int keys[], size_t length);
private:
protected:
@@ -60,34 +58,7 @@
SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src);
};
-// @deprecate
-// template <typename ELEM_T>
-// size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
-// hashtable_t *hashtable = mm_get_hashtable();
-// std::set<int> *keyset = hashtable_keyset(hashtable);
-// std::set<int>::iterator keyItr;
-// LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
-// bool found;
-// size_t count = 0;
-// for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
-// found = false;
-// for (size_t i = 0; i < length; i++) {
-// if (*keyItr == keys[i]) {
-// found = true;
-// break;
-// }
-// }
-// if (!found && *keyItr > 100) {
-// // 閿�姣佸叡浜唴瀛樼殑queue
-// mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
-// delete mqueue;
-// hashtable_remove(hashtable, *keyItr);
-// count++;
-// }
-// }
-// delete keyset;
-// return count;
-// }
+
diff --git a/src/shm/mem_pool.h b/src/shm/mem_pool.h
deleted file mode 100644
index 5a698ec..0000000
--- a/src/shm/mem_pool.h
+++ /dev/null
@@ -1,63 +0,0 @@
-#ifndef _MEM_POOL_H_
-#define _MEM_POOL_H_
-#include "mm.h"
-#include "sem_util.h"
-#define MEM_POOL_COND_KEY 0x8801
-
-
-// static int mem_pool_mutex = SemUtil::get(MEM_POOL_COND_KEY, 1);
-
-static inline void mem_pool_init(size_t heap_size) {
- mm_init(heap_size);
-}
-
-static inline void mem_pool_destroy(void) {
- mm_destroy();
-
-}
-
-static inline void *mem_pool_malloc (size_t size) {
- return mm_malloc(size);
-}
-
-
-static inline void mem_pool_free (void *ptr) {
- mm_free(ptr);
-}
-
-
-template <typename T>
-static inline T* mem_pool_attach(int key) {
- void *ptr;
- // T* tptr;
- hashtable_t *hashtable = mm_get_hashtable();
- ptr = hashtable_get(hashtable, key);
-// printf("mem_pool_malloc_by_key malloc before %d, %p\n", key, ptr);
- if(ptr == NULL || ptr == (void *)1 ) {
- ptr = mm_malloc(sizeof(T));
- hashtable_put(hashtable, key, ptr);
- new(ptr) T;
-// printf("mem_pool_malloc_by_key use new %d, %p\n", key, ptr);
- }
- return (T*)ptr;
-}
-
-static inline void mem_pool_free_by_key(int key) {
- return mm_free_by_key(key);
-}
-
-
-static inline void *mem_pool_realloc (void *ptr, size_t size) {
- return mm_realloc(ptr, size);
-}
-
-static inline int mem_pool_alloc_key() {
-
- return mm_alloc_key();
-}
-
-
-// extern int mm_checkheap(int verbose);
-
-
-#endif
\ No newline at end of file
diff --git a/src/shm/shm_allocator.h b/src/shm/shm_allocator.h
index 084a678..d14708f 100644
--- a/src/shm/shm_allocator.h
+++ b/src/shm/shm_allocator.h
@@ -67,12 +67,12 @@
public:
static void *allocate (size_t size) {
return mm_malloc(size);
- // return mem_pool_malloc(size);
+ // return shm_mm_malloc(size);
}
static void deallocate (void *ptr) {
return mm_free(ptr);
- // return mem_pool_free(ptr);
+ // return shm_mm_free(ptr);
}
};
diff --git a/src/shm/shm_mm.cpp b/src/shm/shm_mm.cpp
new file mode 100644
index 0000000..6341086
--- /dev/null
+++ b/src/shm/shm_mm.cpp
@@ -0,0 +1,60 @@
+#include "shm_mm.h"
+#include "mm.h"
+#include "sem_util.h"
+
+
+void shm_mm_init(size_t heap_size) {
+ mm_init(heap_size);
+ shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY);
+}
+
+void shm_mm_destroy(void) {
+ mm_destroy();
+
+}
+
+void *shm_mm_malloc (size_t size) {
+ return mm_malloc(size);
+}
+
+
+void shm_mm_free (void *ptr) {
+ mm_free(ptr);
+}
+
+
+template <typename T>
+ T* shm_mm_attach(int key) {
+ void *ptr;
+ // T* tptr;
+ hashtable_t *hashtable = mm_get_hashtable();
+ ptr = hashtable_get(hashtable, key);
+// printf("shm_mm_malloc_by_key malloc before %d, %p\n", key, ptr);
+ if(ptr == NULL || ptr == (void *)1 ) {
+ ptr = mm_malloc(sizeof(T));
+ hashtable_put(hashtable, key, ptr);
+ new(ptr) T;
+// printf("shm_mm_malloc_by_key use new %d, %p\n", key, ptr);
+ }
+ return (T*)ptr;
+}
+
+void shm_mm_free_by_key(int key) {
+ return mm_free_by_key(key);
+}
+
+
+void *shm_mm_realloc (void *ptr, size_t size) {
+ return mm_realloc(ptr, size);
+}
+
+int shm_mm_alloc_key() {
+
+ return mm_alloc_key();
+}
+
+
+// extern int mm_checkheap(int verbose);
+
+
+#endif
\ No newline at end of file
diff --git a/src/shm/shm_mm.h b/src/shm/shm_mm.h
new file mode 100644
index 0000000..db1bea9
--- /dev/null
+++ b/src/shm/shm_mm.h
@@ -0,0 +1,37 @@
+#ifndef __SHM_MM_H__
+#define __SHM_MM_H__
+
+#define SHM_QUEUE_ST_OPENED 0
+
+#define SHM_QUEUE_ST_CLOSED 1
+
+struct shm_queue_status_t {
+
+ int status;
+ time_t createTime;
+ time_t closeTime;
+};
+
+typedef std::map<int, shm_queue_status_t, std::less<int>, SHM_STL_Allocator<std::pair<const int, shm_queue_status_t> > > ShmQueueStMap;
+
+
+void shm_mm_init(size_t heap_size) ;
+
+void shm_mm_destroy(void) ;
+
+void *shm_mm_malloc (size_t size);
+
+void shm_mm_free (void *ptr);
+
+
+template <typename T>
+T* shm_mm_attach(int key) ;
+
+void shm_mm_free_by_key(int key) ;
+
+
+void *shm_mm_realloc (void *ptr, size_t size);
+
+int shm_mm_alloc_key();
+
+#endif
\ No newline at end of file
diff --git a/src/shm/shm_mm_wrapper.cpp b/src/shm/shm_mm_wrapper.cpp
index 59487c6..f726f8a 100644
--- a/src/shm/shm_mm_wrapper.cpp
+++ b/src/shm/shm_mm_wrapper.cpp
@@ -5,18 +5,48 @@
#include "shm_socket.h"
#define BUFFER_TIME 10
+
+
void shm_mm_wrapper_init(int size) {
- mem_pool_init(size);
+ shm_mm_init(size);
+
}
void shm_mm_wrapper_destroy() {
- mem_pool_destroy();
+ shm_mm_destroy();
}
int shm_mm_wrapper_alloc_key() {
return mm_alloc_key();
}
+
+/**
+ * 鍥炴敹鍋囧垹闄ょ殑key
+ */
+int shm_mm_wrapper_start_resycle() {
+ ShmQueueStMap * shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY);
+ hashtable_t *hashtable = mm_get_hashtable();
+ LockFreeQueue<shm_packet_t> *mqueue;
+ while(true) {
+ for(auto it = shmQueueStMap->begin(); it != shmQueueStMap->end(); ++it ) {
+ if(it->second.status = SHM_QUEUE_ST_CLOSED && difftime(time(NULL), it->second.closeTime) > 2 ) {
+ mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
+ if(mqueue != NULL) {
+ delete mqueue;
+ hashtable_remove(hashtable, it->first);
+ printf("reove queue %d\n", it->first);
+ // 涓嶈兘 erase ,鍚﹀垯浼氬嚭鐜板杩涚▼涔嬮棿鐨勫悓姝ラ棶棰橈紝 鑰岃繖姝f槸杩欓噷瑕佽В鍐崇殑闂
+ // it = shmQueueStMap->erase(it);
+ // continue;
+ }
+ }
+ }
+
+ sleep(1);
+ }
+}
+
//鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue
int shm_mm_wrapper_remove_keys(int keys[], int length) {
hashtable_t *hashtable = mm_get_hashtable();
diff --git a/src/shm/shm_mm_wrapper.h b/src/shm/shm_mm_wrapper.h
index 82e1b55..b39fdc3 100644
--- a/src/shm/shm_mm_wrapper.h
+++ b/src/shm/shm_mm_wrapper.h
@@ -5,8 +5,8 @@
*
*/
-#ifndef __SHM_MM_H__
-#define __SHM_MM_H__
+#ifndef __SHM_MM_WRAPPER_H__
+#define __SHM_MM_WRAPPER_H__
#ifdef __cplusplus
extern "C" {
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 6aa6a95..657941b 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -6,7 +6,7 @@
static Logger *logger = LoggerFactory::getLogger();
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) {
- SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
+ SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
SHMKeySet *subscripter_set;
SHMKeySet::iterator set_iter;
SHMTopicSubMap::iterator map_iter;
@@ -29,7 +29,7 @@
int key;
for(int i = 0; i < length; i++) {
key = keys[i];
- SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
+ SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
SHMKeySet *subscripter_set;
SHMKeySet::iterator set_iter;
SHMTopicSubMap::iterator map_iter;
@@ -79,9 +79,9 @@
* 鍚姩bus
*
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
-*/
+ */
int BusServerSocket::start(){
- topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
+ topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
_run_proxy_();
return 0;
@@ -124,7 +124,7 @@
}
topic_sub_map->clear();
- mem_pool_free_by_key(SHM_BUS_MAP_KEY);
+ shm_mm_free_by_key(SHM_BUS_MAP_KEY);
}
shm_socket_close(shm_socket);
logger->debug("BusServerSocket destory 3");
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 15d4072..466d0b5 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -3,15 +3,6 @@
static Logger *logger = LoggerFactory::getLogger();
-// size_t ShmModSocket::remove_keys(int keys[], size_t length) {
-// BusServerSocket::remove_subscripters(keys, length);
-// return shm_socket_remove_keys(keys, length);
-// }
-
-// size_t ShmModSocket::remove_keys_exclude(int keys[], size_t length) {
-// BusServerSocket::remove_subscripters(keys, length);
-// return shm_socket_remove_keys_exclude(keys, length);
-// }
ShmModSocket::ShmModSocket() {
shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 0d82be0..3366491 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -5,10 +5,11 @@
#include <cassert>
#include "bus_error.h"
#include "sole.h"
+#include "shm_mm.h"
static Logger *logger = LoggerFactory::getLogger();
-
+ShmQueueStMap * shmQueueStMap ;
static void print_msg(char *head, shm_packet_t &msg) {
// err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type);
@@ -101,6 +102,9 @@
if (s != 0)
err_exit(s, "pthread_mutexattr_destroy");
+
+ shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY);
+
return sockt;
}
@@ -109,10 +113,10 @@
int rv;
logger->debug("shm_socket_close\n");
- if(sockt->queue != NULL) {
- delete sockt->queue;
- sockt->queue = NULL;
- }
+ // if(sockt->queue != NULL) {
+ // delete sockt->queue;
+ // sockt->queue = NULL;
+ // }
rv = pthread_mutex_destroy(&(sockt->mutex) );
if(rv != 0) {
@@ -120,6 +124,12 @@
}
free(sockt);
+
+ auto it = shmQueueStMap.find(key);
+ if(it != shmQueueStMap.end()) {
+ it->second.status = SHM_QUEUE_ST_CLOSED
+ it->second.closeTime = time(NULL);
+ }
return 0;
}
@@ -523,6 +533,7 @@
const int key, const struct timespec *timeout, const int flag) {
int rv;
+ shm_queue_status_t stRecord;
hashtable_t *hashtable = mm_get_hashtable();
if( sockt->queue != NULL)
@@ -545,6 +556,12 @@
logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
return EBUS_KEY_INUSED;
}
+
+ // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened
+ stRecord.status = SHM_QUEUE_ST_OPENED;
+ stRecord.createTime = time(NULL);
+ shmQueueStMap.insert({sockt->key, stRecord});
+
}
if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
@@ -559,24 +576,34 @@
return EBUS_SENDTO_SELF;
}
- LockFreeQueue<shm_packet_t> *remoteQueue;
- if ((remoteQueue = shm_socket_attach_queue(key)) == NULL) {
- bus_errno = EBUS_CLOSED;
- logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno));
- return EBUS_CLOSED;
+ // 妫�鏌ey鏍囪鐨勭姸鎬�
+ auto it = shmQueueStMap.find(key);
+ if(it != shmQueueStMap.end()) {
+ if(it->second.status == SHM_QUEUE_ST_CLOSED) {
+ // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨�
+ goto ERR_CLOSED;
+ }
}
-
+ LockFreeQueue<shm_packet_t> *remoteQueue = shm_socket_attach_queue(key);
+
+ if (remoteQueue == NULL ) {
+ goto ERR_CLOSED;
+ }
rv = remoteQueue->push(*sendpak, timeout, flag);
-
return rv;
+
+ERR_CLOSED:
+ logger->error("sendto key %d failed, %s", key, bus_strerror(EBUS_CLOSED));
+ return EBUS_CLOSED;
+
}
// 鐭繛鎺ユ柟寮忔帴鍙�
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, int flag) {
int rv;
-
+ shm_queue_status_t stRecord;
hashtable_t *hashtable = mm_get_hashtable();
shm_packet_t recvpak;
@@ -601,6 +628,10 @@
return EBUS_KEY_INUSED;
}
+ // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened
+ stRecord.status = SHM_QUEUE_ST_OPENED;
+ stRecord.createTime = time(NULL);
+ shmQueueStMap.insert({sockt->key, stRecord});
if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
@@ -609,8 +640,15 @@
LABEL_POP:
- //
- // printf("%p start recv.....\n", sockt);
+ // 妫�鏌ey鏍囪鐨勭姸鎬�
+ // auto shmQueueMapIter = shmQueueStMap.find(sockt->key);
+ // if(shmQueueMapIter != shmQueueStMap.end()) {
+ // stRecord = shmQueueMapIter->second;
+ // if(stRecord.status = SHM_QUEUE_ST_CLOSED) {
+ // // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨�
+ // goto ERR_CLOSED;
+ // }
+ // }
rv = sockt->queue->pop(recvpak, timeout, flag);
if(rv != 0)
@@ -623,10 +661,4 @@
*_recvpak = recvpak;
return rv;
}
-// int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf,
-// const int send_size, const int send_key, void **recv_buf,
-// int *recv_size, const struct timespec *timeout, int flags) {
-
-// struct timespec tm = {10, 0};
-// return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, send_key,recv_buf, recv_size, &tm, flags);
-// }
+
diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp
index 05827a1..56115f8 100644
--- a/test_net_socket/test_net_mod_socket.cpp
+++ b/test_net_socket/test_net_mod_socket.cpp
@@ -75,6 +75,11 @@
}
}
+void start_resycle() {
+ shm_mm_wrapper_start_resycle();
+}
+
+
// 鎵撳嵃鎺ュ彈鍒扮殑璁㈤槄娑堟伅
void *print_sub_msg(void *sockt) {
pthread_detach(pthread_self());
@@ -602,6 +607,9 @@
test_net_pub(opt.publist);
}
+ else if (strcmp("start_resycle", opt.fun) == 0) {
+ start_resycle();
+ }
else {
usage(argv[0]);
--
Gitblit v1.8.0