From 14c345b38d57fd814f217eb8465963a08ca79f7e Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 05 二月 2021 17:41:09 +0800 Subject: [PATCH] update --- src/socket/shm_mod_socket.h | 1 src/shm/shm_mm_wrapper.h | 13 +++ src/queue/shm_queue.h | 54 ++++++------ src/socket/shm_socket.h | 1 src/bus_error.h | 1 src/shm/hashtable.h | 9 ++ src/shm/shm_mm_wrapper.cpp | 53 +++++++++++++ src/socket/shm_mod_socket.cpp | 5 + src/bus_error.cpp | 3 src/shm/hashtable.cpp | 33 ++++--- src/socket/shm_socket.cpp | 44 ++++++++++ 11 files changed, 173 insertions(+), 44 deletions(-) diff --git a/src/bus_error.cpp b/src/bus_error.cpp index a9348e0..5752cfd 100644 --- a/src/bus_error.cpp +++ b/src/bus_error.cpp @@ -19,7 +19,8 @@ "Network fault", "Send to self error", "Receive from wrong end", - "Service stoped" + "Service stoped", + "Exceed resource limit" }; diff --git a/src/bus_error.h b/src/bus_error.h index fa9f352..769aa36 100644 --- a/src/bus_error.h +++ b/src/bus_error.h @@ -12,6 +12,7 @@ #define EBUS_SENDTO_SELF 505 #define EBUS_RECVFROM_WRONG_END 506 #define EBUS_STOPED 507 +#define EBUS_EXCEED_LIMIT 508 extern int bus_errno; diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h index 3a7750e..24a4dfc 100644 --- a/src/queue/shm_queue.h +++ b/src/queue/shm_queue.h @@ -61,33 +61,33 @@ }; // @deprecate -template <typename ELEM_T> -size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) { - hashtable_t *hashtable = mm_get_hashtable(); - std::set<int> *keyset = hashtable_keyset(hashtable); - std::set<int>::iterator keyItr; - LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; - bool found; - size_t count = 0; - for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { - found = false; - for (size_t i = 0; i < length; i++) { - if (*keyItr == keys[i]) { - found = true; - break; - } - } - if (!found) { - // 閿�姣佸叡浜唴瀛樼殑queue - mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); - delete mqueue; - hashtable_remove(hashtable, *keyItr); - count++; - } - } - delete keyset; - return count; -} +// template <typename ELEM_T> +// size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) { +// hashtable_t *hashtable = mm_get_hashtable(); +// std::set<int> *keyset = hashtable_keyset(hashtable); +// std::set<int>::iterator keyItr; +// LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; +// bool found; +// size_t count = 0; +// for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { +// found = false; +// for (size_t i = 0; i < length; i++) { +// if (*keyItr == keys[i]) { +// found = true; +// break; +// } +// } +// if (!found && *keyItr > 100) { +// // 閿�姣佸叡浜唴瀛樼殑queue +// mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); +// delete mqueue; +// hashtable_remove(hashtable, *keyItr); +// count++; +// } +// } +// delete keyset; +// return count; +// } diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp index 7a5faf4..e435172 100755 --- a/src/shm/hashtable.cpp +++ b/src/shm/hashtable.cpp @@ -5,6 +5,9 @@ #include "logger_factory.h" #include <set> #include <functional> +#include <limits.h> + + typedef struct tailq_entry_t { @@ -29,22 +32,12 @@ memset(hashtable, 0, sizeof(hashtable_t)); hashtable->mutex = svsem_get(IPC_PRIVATE, 1); - // hashtable->wlock = svsem_get(IPC_PRIVATE, 1); - // hashtable->cond = svsem_get(IPC_PRIVATE, 1); - // hashtable->readcnt = 0; - - // FILE * semfile = fopen("./sem.txt", "w+"); - // if(semfile == NULL) { - // err_exit(errno, "fopen"); - // } - // fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex); - // fclose(semfile); + hashtable->queueCount = 0; + hashtable->currentKey = START_KEY; } void hashtable_destroy(hashtable_t *hashtable) { svsem_remove( hashtable->mutex); - // svsem_remove( hashtable->wlock); - // svsem_remove( hashtable->cond); } @@ -130,6 +123,7 @@ /* mm_free the item as we don't need it anymore. */ mm_free(item); + hashtable->queueCount--; svsem_post(hashtable->mutex); return oldvalue; } @@ -150,6 +144,7 @@ void hashtable_put(hashtable_t *hashtable, int key, void *value) { _hashtable_put(hashtable, key, value); + hashtable->queueCount++; } bool hashtable_check_put(hashtable_t *hashtable, int key, void *value, bool overwrite) { @@ -183,11 +178,18 @@ return false; } - +int hashtable_get_queue_count(hashtable_t *hashtable) { + return hashtable->queueCount; +} int hashtable_alloc_key(hashtable_t *hashtable) { int rv; - int key = START_KEY; + int key = hashtable->currentKey; + + if( key == INT_MAX || key < START_KEY) { + key = START_KEY; + } + rv = svsem_wait(hashtable->mutex); if(rv != 0) { LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n"); @@ -199,10 +201,12 @@ // 鍗犵敤key _hashtable_put(hashtable, key, (void *)1); + hashtable->currentKey = key; rv = svsem_post(hashtable->mutex); if(rv != 0) { LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n"); } + return key; } @@ -279,6 +283,7 @@ hashtable->array[i] = NULL; } + hashtable->queueCount = 0; if((rv = svsem_post(hashtable->mutex)) != 0) { LoggerFactory::getLogger()->error(errno, "hashtable_removeall\n"); } diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h index 47b715b..c19e899 100755 --- a/src/shm/hashtable.h +++ b/src/shm/hashtable.h @@ -7,13 +7,20 @@ #define MAPSIZE 1024 +// 鍒涘缓Queue鏁伴噺鐨勪笂闄� +#define QUEUE_COUNT_LIMIT 300 + typedef struct hashtable_t { struct tailq_header_t* array[MAPSIZE]; int mutex; + int queueCount; + int currentKey; // 褰撳墠鍒嗛厤鐨刱ey // int wlock; // int cond; // size_t readcnt; + + } hashtable_t; typedef void (*hashtable_foreach_cb)(int key, void *value); @@ -29,6 +36,8 @@ int hashtable_lock(hashtable_t *hashtable); int hashtable_unlock(hashtable_t *hashtable); + +int hashtable_get_queue_count(hashtable_t *hashtable) ; /** * 閬嶅巻hash_table * @demo diff --git a/src/shm/shm_mm_wrapper.cpp b/src/shm/shm_mm_wrapper.cpp index d62602f..92350c8 100644 --- a/src/shm/shm_mm_wrapper.cpp +++ b/src/shm/shm_mm_wrapper.cpp @@ -1,6 +1,8 @@ #include "shm_mm_wrapper.h" #include "mem_pool.h" #include "hashtable.h" +#include "lock_free_queue.h" +#include "shm_socket.h" void shm_mm_wrapper_init(int size) { mem_pool_init(size); @@ -13,3 +15,54 @@ int shm_mm_wrapper_alloc_key() { return mm_alloc_key(); } + + + +//鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue +int shm_mm_wrapper_remove_keys(int keys[], int length) { + hashtable_t *hashtable = mm_get_hashtable(); + LockFreeQueue<shm_packet_t> *mqueue; + int count = 0; + for(int i = 0; i< length; i++) { + // 閿�姣佸叡浜唴瀛樼殑queue + mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]); + if(mqueue == NULL) { + continue; + } + delete mqueue; + hashtable_remove(hashtable, keys[i]); + count++; + } + return count; +} + + +// 鍒犻櫎涓嶅湪keys鍐呯殑queue +int shm_mm_wrapper_remove_keys_exclude(int keys[], int length) { + hashtable_t *hashtable = mm_get_hashtable(); + std::set<int> *keyset = hashtable_keyset(hashtable); + std::set<int>::iterator keyItr; + LockFreeQueue<shm_packet_t> *mqueue; + bool found; + int count = 0; + for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { + found = false; + for (size_t i = 0; i < length; i++) { + if (*keyItr == keys[i]) { + found = true; + break; + } + } + // 100鍐呯殑鏄痓us鍐呴儴鑷繁鐢ㄧ殑 + if (!found && *keyItr > 100) { + // 閿�姣佸叡浜唴瀛樼殑queue + mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr); + + delete mqueue; + hashtable_remove(hashtable, *keyItr); + count++; + } + } + delete keyset; + return count; +} diff --git a/src/shm/shm_mm_wrapper.h b/src/shm/shm_mm_wrapper.h index 8ad3cf5..82e1b55 100644 --- a/src/shm/shm_mm_wrapper.h +++ b/src/shm/shm_mm_wrapper.h @@ -33,6 +33,19 @@ int shm_mm_wrapper_alloc_key(); +/** + * @brief 鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue + * @return 鍒犻櫎鐨勪釜鏁� + */ +int shm_mm_wrapper_remove_keys(int keys[], int length); + +/** + * @brief 鍒犻櫎涓嶅湪keys鍐呯殑queue + * @return 鍒犻櫎鐨勪釜鏁� + */ +int shm_mm_wrapper_remove_keys_exclude(int keys[], int length); + + #ifdef __cplusplus } #endif diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp index dc071cb..4898c81 100644 --- a/src/socket/shm_mod_socket.cpp +++ b/src/socket/shm_mod_socket.cpp @@ -8,6 +8,11 @@ return shm_socket_remove_keys(keys, length); } +size_t ShmModSocket::remove_keys_exclude(int keys[], size_t length) { + BusServerSocket::remove_subscripters(keys, length); + return shm_socket_remove_keys_exclude(keys, length); +} + ShmModSocket::ShmModSocket() { shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); bus_set = new std::set<int>; diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h index 44a714b..3795cfb 100644 --- a/src/socket/shm_mod_socket.h +++ b/src/socket/shm_mod_socket.h @@ -37,6 +37,7 @@ public: static size_t remove_keys(int keys[], size_t length); + static size_t remove_keys_exclude(int keys[], size_t length); // bus header 缂栫爜涓虹綉缁滀紶杈撶殑瀛楄妭 static void * encode_bus_head(bus_head_t & bushead); diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index fc410bf..14274ca 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -65,7 +65,7 @@ return queue; } - +//鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue size_t shm_socket_remove_keys(int keys[], size_t length) { hashtable_t *hashtable = mm_get_hashtable(); LockFreeQueue<shm_packet_t> *mqueue; @@ -79,6 +79,38 @@ } return count; } + + +// 鍒犻櫎涓嶅湪keys鍐呯殑queue +size_t shm_socket_remove_keys_exclude(int keys[], size_t length) { + hashtable_t *hashtable = mm_get_hashtable(); + std::set<int> *keyset = hashtable_keyset(hashtable); + std::set<int>::iterator keyItr; + LockFreeQueue<shm_packet_t> *mqueue; + bool found; + size_t count = 0; + for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { + found = false; + for (size_t i = 0; i < length; i++) { + if (*keyItr == keys[i]) { + found = true; + break; + } + } + // 100鍐呯殑鏄痓us鍐呴儴鑷繁鐢ㄧ殑 + if (!found && *keyItr > 100) { + // 閿�姣佸叡浜唴瀛樼殑queue + mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr); + delete mqueue; + hashtable_remove(hashtable, *keyItr); + count++; + } + } + delete keyset; + return count; +} + + shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { int s, type; @@ -493,11 +525,15 @@ if( sockt->queue != NULL) goto LABEL_PUSH; + + if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { + return EBUS_EXCEED_LIMIT; + } { if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) err_exit(rv, "shm_sendto : pthread_mutex_lock"); - + if (sockt->queue == NULL) { if (sockt->key == 0) { sockt->key = hashtable_alloc_key(hashtable); @@ -545,6 +581,10 @@ if( sockt->queue != NULL) goto LABEL_POP; + if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { + return EBUS_EXCEED_LIMIT; + } + { if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) err_exit(rv, "shm_recvfrom : pthread_mutex_lock"); diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h index e7c0dda..516a343 100644 --- a/src/socket/shm_socket.h +++ b/src/socket/shm_socket.h @@ -46,6 +46,7 @@ typedef std::function<void(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void *user_data)> recvandsend_callback_fn; size_t shm_socket_remove_keys(int keys[], size_t length); +size_t shm_socket_remove_keys_exclude(int keys[], size_t length); shm_socket_t *shm_open_socket(shm_socket_type_t socket_type); -- Gitblit v1.8.0