From e0aea3742aed09a0a9ed384ccd7db203b6efc650 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期六, 20 二月 2021 14:43:52 +0800 Subject: [PATCH] update --- src/key_def.h | 3 src/shm/shm_mm_wrapper.h | 4 src/queue/lock_free_queue.h | 36 ++++ test_net_socket/test_net_mod_socket.cpp | 8 + src/shm/shm_mm_wrapper.cpp | 34 ++++ src/socket/shm_socket.cpp | 76 +++++++--- /dev/null | 63 --------- src/shm/shm_mm.cpp | 60 ++++++++ src/queue/shm_queue.h | 31 ---- src/socket/bus_server_socket.cpp | 10 src/shm/shm_mm.h | 37 +++++ src/shm/shm_allocator.h | 4 src/socket/shm_mod_socket.cpp | 9 - 13 files changed, 235 insertions(+), 140 deletions(-) diff --git a/src/key_def.h b/src/key_def.h index 4f0e1c0..904b78f 100644 --- a/src/key_def.h +++ b/src/key_def.h @@ -2,6 +2,9 @@ #define _KEY_DEF_H_ #define SHM_BUS_MAP_KEY 1 + + +#define SHM_QUEUE_ST_KEY 3 // BUS key #define SHM_BUS_KEY 8 // 缃戠粶浠g悊key diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h index 425d9f8..d66ee8c 100644 --- a/src/queue/lock_free_queue.h +++ b/src/queue/lock_free_queue.h @@ -17,6 +17,11 @@ // default Queue size #define LOCK_FREE_Q_DEFAULT_SIZE 16 + +#define LOCK_FREE_Q_ST_OPENED 0 + +#define LOCK_FREE_Q_ST_CLOSED 1 + // static Logger *logger = LoggerFactory::getLogger(); // define this macro if calls to "size" must return the real size of the // queue. If it is undefined that function will try to take a snapshot of @@ -84,10 +89,10 @@ sem_t items; time_t createTime; + time_t closeTime; + int status; public: - // sem_t mutex; - LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); @@ -95,6 +100,8 @@ /// Note it is not virtual since it is not expected to inherit from this /// template ~LockFreeQueue(); + + inline void close(); // std::atomic_uint reference; /// @brief constructor of the class @@ -120,8 +127,18 @@ inline ELEM_T &operator[](unsigned i); + + time_t getCreateTime() { return createTime; + } + + time_t getCloseTime() { + return closeTime; + } + + int getStatus() { + return status; } /// @brief push an element at the tail of the queue @@ -166,7 +183,18 @@ err_exit(errno, "LockFreeQueue sem_init"); createTime = time(NULL); + status = LOCK_FREE_Q_ST_OPENED; +} + + +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() { + status = LOCK_FREE_Q_ST_CLOSED; + closeTime = time(NULL); } @@ -182,9 +210,7 @@ if (sem_destroy(&items) == -1) { err_exit(errno, "LockFreeQueue sem_destroy"); } - // if (sem_destroy(&mutex) == -1) { - // err_exit(errno, "LockFreeQueue sem_destroy"); - // } + } template< diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h index 24a4dfc..7893485 100644 --- a/src/queue/shm_queue.h +++ b/src/queue/shm_queue.h @@ -45,8 +45,6 @@ ELEM_T &operator[](unsigned i); - // @deprecate - static size_t remove_queues_exclude(int keys[], size_t length); private: protected: @@ -60,34 +58,7 @@ SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src); }; -// @deprecate -// template <typename ELEM_T> -// size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) { -// hashtable_t *hashtable = mm_get_hashtable(); -// std::set<int> *keyset = hashtable_keyset(hashtable); -// std::set<int>::iterator keyItr; -// LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; -// bool found; -// size_t count = 0; -// for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { -// found = false; -// for (size_t i = 0; i < length; i++) { -// if (*keyItr == keys[i]) { -// found = true; -// break; -// } -// } -// if (!found && *keyItr > 100) { -// // 閿�姣佸叡浜唴瀛樼殑queue -// mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); -// delete mqueue; -// hashtable_remove(hashtable, *keyItr); -// count++; -// } -// } -// delete keyset; -// return count; -// } + diff --git a/src/shm/mem_pool.h b/src/shm/mem_pool.h deleted file mode 100644 index 5a698ec..0000000 --- a/src/shm/mem_pool.h +++ /dev/null @@ -1,63 +0,0 @@ -#ifndef _MEM_POOL_H_ -#define _MEM_POOL_H_ -#include "mm.h" -#include "sem_util.h" -#define MEM_POOL_COND_KEY 0x8801 - - -// static int mem_pool_mutex = SemUtil::get(MEM_POOL_COND_KEY, 1); - -static inline void mem_pool_init(size_t heap_size) { - mm_init(heap_size); -} - -static inline void mem_pool_destroy(void) { - mm_destroy(); - -} - -static inline void *mem_pool_malloc (size_t size) { - return mm_malloc(size); -} - - -static inline void mem_pool_free (void *ptr) { - mm_free(ptr); -} - - -template <typename T> -static inline T* mem_pool_attach(int key) { - void *ptr; - // T* tptr; - hashtable_t *hashtable = mm_get_hashtable(); - ptr = hashtable_get(hashtable, key); -// printf("mem_pool_malloc_by_key malloc before %d, %p\n", key, ptr); - if(ptr == NULL || ptr == (void *)1 ) { - ptr = mm_malloc(sizeof(T)); - hashtable_put(hashtable, key, ptr); - new(ptr) T; -// printf("mem_pool_malloc_by_key use new %d, %p\n", key, ptr); - } - return (T*)ptr; -} - -static inline void mem_pool_free_by_key(int key) { - return mm_free_by_key(key); -} - - -static inline void *mem_pool_realloc (void *ptr, size_t size) { - return mm_realloc(ptr, size); -} - -static inline int mem_pool_alloc_key() { - - return mm_alloc_key(); -} - - -// extern int mm_checkheap(int verbose); - - -#endif \ No newline at end of file diff --git a/src/shm/shm_allocator.h b/src/shm/shm_allocator.h index 084a678..d14708f 100644 --- a/src/shm/shm_allocator.h +++ b/src/shm/shm_allocator.h @@ -67,12 +67,12 @@ public: static void *allocate (size_t size) { return mm_malloc(size); - // return mem_pool_malloc(size); + // return shm_mm_malloc(size); } static void deallocate (void *ptr) { return mm_free(ptr); - // return mem_pool_free(ptr); + // return shm_mm_free(ptr); } }; diff --git a/src/shm/shm_mm.cpp b/src/shm/shm_mm.cpp new file mode 100644 index 0000000..6341086 --- /dev/null +++ b/src/shm/shm_mm.cpp @@ -0,0 +1,60 @@ +#include "shm_mm.h" +#include "mm.h" +#include "sem_util.h" + + +void shm_mm_init(size_t heap_size) { + mm_init(heap_size); + shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); +} + +void shm_mm_destroy(void) { + mm_destroy(); + +} + +void *shm_mm_malloc (size_t size) { + return mm_malloc(size); +} + + +void shm_mm_free (void *ptr) { + mm_free(ptr); +} + + +template <typename T> + T* shm_mm_attach(int key) { + void *ptr; + // T* tptr; + hashtable_t *hashtable = mm_get_hashtable(); + ptr = hashtable_get(hashtable, key); +// printf("shm_mm_malloc_by_key malloc before %d, %p\n", key, ptr); + if(ptr == NULL || ptr == (void *)1 ) { + ptr = mm_malloc(sizeof(T)); + hashtable_put(hashtable, key, ptr); + new(ptr) T; +// printf("shm_mm_malloc_by_key use new %d, %p\n", key, ptr); + } + return (T*)ptr; +} + +void shm_mm_free_by_key(int key) { + return mm_free_by_key(key); +} + + +void *shm_mm_realloc (void *ptr, size_t size) { + return mm_realloc(ptr, size); +} + +int shm_mm_alloc_key() { + + return mm_alloc_key(); +} + + +// extern int mm_checkheap(int verbose); + + +#endif \ No newline at end of file diff --git a/src/shm/shm_mm.h b/src/shm/shm_mm.h new file mode 100644 index 0000000..db1bea9 --- /dev/null +++ b/src/shm/shm_mm.h @@ -0,0 +1,37 @@ +#ifndef __SHM_MM_H__ +#define __SHM_MM_H__ + +#define SHM_QUEUE_ST_OPENED 0 + +#define SHM_QUEUE_ST_CLOSED 1 + +struct shm_queue_status_t { + + int status; + time_t createTime; + time_t closeTime; +}; + +typedef std::map<int, shm_queue_status_t, std::less<int>, SHM_STL_Allocator<std::pair<const int, shm_queue_status_t> > > ShmQueueStMap; + + +void shm_mm_init(size_t heap_size) ; + +void shm_mm_destroy(void) ; + +void *shm_mm_malloc (size_t size); + +void shm_mm_free (void *ptr); + + +template <typename T> +T* shm_mm_attach(int key) ; + +void shm_mm_free_by_key(int key) ; + + +void *shm_mm_realloc (void *ptr, size_t size); + +int shm_mm_alloc_key(); + +#endif \ No newline at end of file diff --git a/src/shm/shm_mm_wrapper.cpp b/src/shm/shm_mm_wrapper.cpp index 59487c6..f726f8a 100644 --- a/src/shm/shm_mm_wrapper.cpp +++ b/src/shm/shm_mm_wrapper.cpp @@ -5,18 +5,48 @@ #include "shm_socket.h" #define BUFFER_TIME 10 + + void shm_mm_wrapper_init(int size) { - mem_pool_init(size); + shm_mm_init(size); + } void shm_mm_wrapper_destroy() { - mem_pool_destroy(); + shm_mm_destroy(); } int shm_mm_wrapper_alloc_key() { return mm_alloc_key(); } + +/** + * 鍥炴敹鍋囧垹闄ょ殑key + */ +int shm_mm_wrapper_start_resycle() { + ShmQueueStMap * shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); + hashtable_t *hashtable = mm_get_hashtable(); + LockFreeQueue<shm_packet_t> *mqueue; + while(true) { + for(auto it = shmQueueStMap->begin(); it != shmQueueStMap->end(); ++it ) { + if(it->second.status = SHM_QUEUE_ST_CLOSED && difftime(time(NULL), it->second.closeTime) > 2 ) { + mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]); + if(mqueue != NULL) { + delete mqueue; + hashtable_remove(hashtable, it->first); + printf("reove queue %d\n", it->first); + // 涓嶈兘 erase ,鍚﹀垯浼氬嚭鐜板杩涚▼涔嬮棿鐨勫悓姝ラ棶棰橈紝 鑰岃繖姝f槸杩欓噷瑕佽В鍐崇殑闂 + // it = shmQueueStMap->erase(it); + // continue; + } + } + } + + sleep(1); + } +} + //鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue int shm_mm_wrapper_remove_keys(int keys[], int length) { hashtable_t *hashtable = mm_get_hashtable(); diff --git a/src/shm/shm_mm_wrapper.h b/src/shm/shm_mm_wrapper.h index 82e1b55..b39fdc3 100644 --- a/src/shm/shm_mm_wrapper.h +++ b/src/shm/shm_mm_wrapper.h @@ -5,8 +5,8 @@ * */ -#ifndef __SHM_MM_H__ -#define __SHM_MM_H__ +#ifndef __SHM_MM_WRAPPER_H__ +#define __SHM_MM_WRAPPER_H__ #ifdef __cplusplus extern "C" { diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index 6aa6a95..657941b 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -6,7 +6,7 @@ static Logger *logger = LoggerFactory::getLogger(); void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { - SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); + SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); SHMKeySet *subscripter_set; SHMKeySet::iterator set_iter; SHMTopicSubMap::iterator map_iter; @@ -29,7 +29,7 @@ int key; for(int i = 0; i < length; i++) { key = keys[i]; - SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); + SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); SHMKeySet *subscripter_set; SHMKeySet::iterator set_iter; SHMTopicSubMap::iterator map_iter; @@ -79,9 +79,9 @@ * 鍚姩bus * * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ + */ int BusServerSocket::start(){ - topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); + topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); _run_proxy_(); return 0; @@ -124,7 +124,7 @@ } topic_sub_map->clear(); - mem_pool_free_by_key(SHM_BUS_MAP_KEY); + shm_mm_free_by_key(SHM_BUS_MAP_KEY); } shm_socket_close(shm_socket); logger->debug("BusServerSocket destory 3"); diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp index 15d4072..466d0b5 100644 --- a/src/socket/shm_mod_socket.cpp +++ b/src/socket/shm_mod_socket.cpp @@ -3,15 +3,6 @@ static Logger *logger = LoggerFactory::getLogger(); -// size_t ShmModSocket::remove_keys(int keys[], size_t length) { -// BusServerSocket::remove_subscripters(keys, length); -// return shm_socket_remove_keys(keys, length); -// } - -// size_t ShmModSocket::remove_keys_exclude(int keys[], size_t length) { -// BusServerSocket::remove_subscripters(keys, length); -// return shm_socket_remove_keys_exclude(keys, length); -// } ShmModSocket::ShmModSocket() { shm_socket = shm_socket_open(SHM_SOCKET_DGRAM); diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 0d82be0..3366491 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -5,10 +5,11 @@ #include <cassert> #include "bus_error.h" #include "sole.h" +#include "shm_mm.h" static Logger *logger = LoggerFactory::getLogger(); - +ShmQueueStMap * shmQueueStMap ; static void print_msg(char *head, shm_packet_t &msg) { // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); @@ -101,6 +102,9 @@ if (s != 0) err_exit(s, "pthread_mutexattr_destroy"); + + shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); + return sockt; } @@ -109,10 +113,10 @@ int rv; logger->debug("shm_socket_close\n"); - if(sockt->queue != NULL) { - delete sockt->queue; - sockt->queue = NULL; - } + // if(sockt->queue != NULL) { + // delete sockt->queue; + // sockt->queue = NULL; + // } rv = pthread_mutex_destroy(&(sockt->mutex) ); if(rv != 0) { @@ -120,6 +124,12 @@ } free(sockt); + + auto it = shmQueueStMap.find(key); + if(it != shmQueueStMap.end()) { + it->second.status = SHM_QUEUE_ST_CLOSED + it->second.closeTime = time(NULL); + } return 0; } @@ -523,6 +533,7 @@ const int key, const struct timespec *timeout, const int flag) { int rv; + shm_queue_status_t stRecord; hashtable_t *hashtable = mm_get_hashtable(); if( sockt->queue != NULL) @@ -545,6 +556,12 @@ logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); return EBUS_KEY_INUSED; } + + // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened + stRecord.status = SHM_QUEUE_ST_OPENED; + stRecord.createTime = time(NULL); + shmQueueStMap.insert({sockt->key, stRecord}); + } if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) @@ -559,24 +576,34 @@ return EBUS_SENDTO_SELF; } - LockFreeQueue<shm_packet_t> *remoteQueue; - if ((remoteQueue = shm_socket_attach_queue(key)) == NULL) { - bus_errno = EBUS_CLOSED; - logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno)); - return EBUS_CLOSED; + // 妫�鏌ey鏍囪鐨勭姸鎬� + auto it = shmQueueStMap.find(key); + if(it != shmQueueStMap.end()) { + if(it->second.status == SHM_QUEUE_ST_CLOSED) { + // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨� + goto ERR_CLOSED; + } } - + LockFreeQueue<shm_packet_t> *remoteQueue = shm_socket_attach_queue(key); + + if (remoteQueue == NULL ) { + goto ERR_CLOSED; + } rv = remoteQueue->push(*sendpak, timeout, flag); - return rv; + +ERR_CLOSED: + logger->error("sendto key %d failed, %s", key, bus_strerror(EBUS_CLOSED)); + return EBUS_CLOSED; + } // 鐭繛鎺ユ柟寮忔帴鍙� static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, int flag) { int rv; - + shm_queue_status_t stRecord; hashtable_t *hashtable = mm_get_hashtable(); shm_packet_t recvpak; @@ -601,6 +628,10 @@ return EBUS_KEY_INUSED; } + // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened + stRecord.status = SHM_QUEUE_ST_OPENED; + stRecord.createTime = time(NULL); + shmQueueStMap.insert({sockt->key, stRecord}); if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); @@ -609,8 +640,15 @@ LABEL_POP: - // - // printf("%p start recv.....\n", sockt); + // 妫�鏌ey鏍囪鐨勭姸鎬� + // auto shmQueueMapIter = shmQueueStMap.find(sockt->key); + // if(shmQueueMapIter != shmQueueStMap.end()) { + // stRecord = shmQueueMapIter->second; + // if(stRecord.status = SHM_QUEUE_ST_CLOSED) { + // // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨� + // goto ERR_CLOSED; + // } + // } rv = sockt->queue->pop(recvpak, timeout, flag); if(rv != 0) @@ -623,10 +661,4 @@ *_recvpak = recvpak; return rv; } -// int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf, -// const int send_size, const int send_key, void **recv_buf, -// int *recv_size, const struct timespec *timeout, int flags) { - -// struct timespec tm = {10, 0}; -// return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, send_key,recv_buf, recv_size, &tm, flags); -// } + diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp index 05827a1..56115f8 100644 --- a/test_net_socket/test_net_mod_socket.cpp +++ b/test_net_socket/test_net_mod_socket.cpp @@ -75,6 +75,11 @@ } } +void start_resycle() { + shm_mm_wrapper_start_resycle(); +} + + // 鎵撳嵃鎺ュ彈鍒扮殑璁㈤槄娑堟伅 void *print_sub_msg(void *sockt) { pthread_detach(pthread_self()); @@ -602,6 +607,9 @@ test_net_pub(opt.publist); } + else if (strcmp("start_resycle", opt.fun) == 0) { + start_resycle(); + } else { usage(argv[0]); -- Gitblit v1.8.0