From 14c345b38d57fd814f217eb8465963a08ca79f7e Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 05 二月 2021 17:41:09 +0800
Subject: [PATCH] update
---
src/socket/shm_mod_socket.h | 1
src/shm/shm_mm_wrapper.h | 13 +++
src/queue/shm_queue.h | 54 ++++++------
src/socket/shm_socket.h | 1
src/bus_error.h | 1
src/shm/hashtable.h | 9 ++
src/shm/shm_mm_wrapper.cpp | 53 +++++++++++++
src/socket/shm_mod_socket.cpp | 5 +
src/bus_error.cpp | 3
src/shm/hashtable.cpp | 33 ++++---
src/socket/shm_socket.cpp | 44 ++++++++++
11 files changed, 173 insertions(+), 44 deletions(-)
diff --git a/src/bus_error.cpp b/src/bus_error.cpp
index a9348e0..5752cfd 100644
--- a/src/bus_error.cpp
+++ b/src/bus_error.cpp
@@ -19,7 +19,8 @@
"Network fault",
"Send to self error",
"Receive from wrong end",
- "Service stoped"
+ "Service stoped",
+ "Exceed resource limit"
};
diff --git a/src/bus_error.h b/src/bus_error.h
index fa9f352..769aa36 100644
--- a/src/bus_error.h
+++ b/src/bus_error.h
@@ -12,6 +12,7 @@
#define EBUS_SENDTO_SELF 505
#define EBUS_RECVFROM_WRONG_END 506
#define EBUS_STOPED 507
+#define EBUS_EXCEED_LIMIT 508
extern int bus_errno;
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 3a7750e..24a4dfc 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -61,33 +61,33 @@
};
// @deprecate
-template <typename ELEM_T>
-size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
- hashtable_t *hashtable = mm_get_hashtable();
- std::set<int> *keyset = hashtable_keyset(hashtable);
- std::set<int>::iterator keyItr;
- LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
- bool found;
- size_t count = 0;
- for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
- found = false;
- for (size_t i = 0; i < length; i++) {
- if (*keyItr == keys[i]) {
- found = true;
- break;
- }
- }
- if (!found) {
- // 閿�姣佸叡浜唴瀛樼殑queue
- mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
- delete mqueue;
- hashtable_remove(hashtable, *keyItr);
- count++;
- }
- }
- delete keyset;
- return count;
-}
+// template <typename ELEM_T>
+// size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
+// hashtable_t *hashtable = mm_get_hashtable();
+// std::set<int> *keyset = hashtable_keyset(hashtable);
+// std::set<int>::iterator keyItr;
+// LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
+// bool found;
+// size_t count = 0;
+// for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
+// found = false;
+// for (size_t i = 0; i < length; i++) {
+// if (*keyItr == keys[i]) {
+// found = true;
+// break;
+// }
+// }
+// if (!found && *keyItr > 100) {
+// // 閿�姣佸叡浜唴瀛樼殑queue
+// mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
+// delete mqueue;
+// hashtable_remove(hashtable, *keyItr);
+// count++;
+// }
+// }
+// delete keyset;
+// return count;
+// }
diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp
index 7a5faf4..e435172 100755
--- a/src/shm/hashtable.cpp
+++ b/src/shm/hashtable.cpp
@@ -5,6 +5,9 @@
#include "logger_factory.h"
#include <set>
#include <functional>
+#include <limits.h>
+
+
typedef struct tailq_entry_t
{
@@ -29,22 +32,12 @@
memset(hashtable, 0, sizeof(hashtable_t));
hashtable->mutex = svsem_get(IPC_PRIVATE, 1);
- // hashtable->wlock = svsem_get(IPC_PRIVATE, 1);
- // hashtable->cond = svsem_get(IPC_PRIVATE, 1);
- // hashtable->readcnt = 0;
-
- // FILE * semfile = fopen("./sem.txt", "w+");
- // if(semfile == NULL) {
- // err_exit(errno, "fopen");
- // }
- // fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex);
- // fclose(semfile);
+ hashtable->queueCount = 0;
+ hashtable->currentKey = START_KEY;
}
void hashtable_destroy(hashtable_t *hashtable) {
svsem_remove( hashtable->mutex);
- // svsem_remove( hashtable->wlock);
- // svsem_remove( hashtable->cond);
}
@@ -130,6 +123,7 @@
/* mm_free the item as we don't need it anymore. */
mm_free(item);
+ hashtable->queueCount--;
svsem_post(hashtable->mutex);
return oldvalue;
}
@@ -150,6 +144,7 @@
void hashtable_put(hashtable_t *hashtable, int key, void *value) {
_hashtable_put(hashtable, key, value);
+ hashtable->queueCount++;
}
bool hashtable_check_put(hashtable_t *hashtable, int key, void *value, bool overwrite) {
@@ -183,11 +178,18 @@
return false;
}
-
+int hashtable_get_queue_count(hashtable_t *hashtable) {
+ return hashtable->queueCount;
+}
int hashtable_alloc_key(hashtable_t *hashtable) {
int rv;
- int key = START_KEY;
+ int key = hashtable->currentKey;
+
+ if( key == INT_MAX || key < START_KEY) {
+ key = START_KEY;
+ }
+
rv = svsem_wait(hashtable->mutex);
if(rv != 0) {
LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n");
@@ -199,10 +201,12 @@
// 鍗犵敤key
_hashtable_put(hashtable, key, (void *)1);
+ hashtable->currentKey = key;
rv = svsem_post(hashtable->mutex);
if(rv != 0) {
LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n");
}
+
return key;
}
@@ -279,6 +283,7 @@
hashtable->array[i] = NULL;
}
+ hashtable->queueCount = 0;
if((rv = svsem_post(hashtable->mutex)) != 0) {
LoggerFactory::getLogger()->error(errno, "hashtable_removeall\n");
}
diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h
index 47b715b..c19e899 100755
--- a/src/shm/hashtable.h
+++ b/src/shm/hashtable.h
@@ -7,13 +7,20 @@
#define MAPSIZE 1024
+// 鍒涘缓Queue鏁伴噺鐨勪笂闄�
+#define QUEUE_COUNT_LIMIT 300
+
typedef struct hashtable_t
{
struct tailq_header_t* array[MAPSIZE];
int mutex;
+ int queueCount;
+ int currentKey; // 褰撳墠鍒嗛厤鐨刱ey
// int wlock;
// int cond;
// size_t readcnt;
+
+
} hashtable_t;
typedef void (*hashtable_foreach_cb)(int key, void *value);
@@ -29,6 +36,8 @@
int hashtable_lock(hashtable_t *hashtable);
int hashtable_unlock(hashtable_t *hashtable);
+
+int hashtable_get_queue_count(hashtable_t *hashtable) ;
/**
* 閬嶅巻hash_table
* @demo
diff --git a/src/shm/shm_mm_wrapper.cpp b/src/shm/shm_mm_wrapper.cpp
index d62602f..92350c8 100644
--- a/src/shm/shm_mm_wrapper.cpp
+++ b/src/shm/shm_mm_wrapper.cpp
@@ -1,6 +1,8 @@
#include "shm_mm_wrapper.h"
#include "mem_pool.h"
#include "hashtable.h"
+#include "lock_free_queue.h"
+#include "shm_socket.h"
void shm_mm_wrapper_init(int size) {
mem_pool_init(size);
@@ -13,3 +15,54 @@
int shm_mm_wrapper_alloc_key() {
return mm_alloc_key();
}
+
+
+
+//鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue
+int shm_mm_wrapper_remove_keys(int keys[], int length) {
+ hashtable_t *hashtable = mm_get_hashtable();
+ LockFreeQueue<shm_packet_t> *mqueue;
+ int count = 0;
+ for(int i = 0; i< length; i++) {
+ // 閿�姣佸叡浜唴瀛樼殑queue
+ mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
+ if(mqueue == NULL) {
+ continue;
+ }
+ delete mqueue;
+ hashtable_remove(hashtable, keys[i]);
+ count++;
+ }
+ return count;
+}
+
+
+// 鍒犻櫎涓嶅湪keys鍐呯殑queue
+int shm_mm_wrapper_remove_keys_exclude(int keys[], int length) {
+ hashtable_t *hashtable = mm_get_hashtable();
+ std::set<int> *keyset = hashtable_keyset(hashtable);
+ std::set<int>::iterator keyItr;
+ LockFreeQueue<shm_packet_t> *mqueue;
+ bool found;
+ int count = 0;
+ for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
+ found = false;
+ for (size_t i = 0; i < length; i++) {
+ if (*keyItr == keys[i]) {
+ found = true;
+ break;
+ }
+ }
+ // 100鍐呯殑鏄痓us鍐呴儴鑷繁鐢ㄧ殑
+ if (!found && *keyItr > 100) {
+ // 閿�姣佸叡浜唴瀛樼殑queue
+ mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
+
+ delete mqueue;
+ hashtable_remove(hashtable, *keyItr);
+ count++;
+ }
+ }
+ delete keyset;
+ return count;
+}
diff --git a/src/shm/shm_mm_wrapper.h b/src/shm/shm_mm_wrapper.h
index 8ad3cf5..82e1b55 100644
--- a/src/shm/shm_mm_wrapper.h
+++ b/src/shm/shm_mm_wrapper.h
@@ -33,6 +33,19 @@
int shm_mm_wrapper_alloc_key();
+/**
+ * @brief 鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue
+ * @return 鍒犻櫎鐨勪釜鏁�
+ */
+int shm_mm_wrapper_remove_keys(int keys[], int length);
+
+/**
+ * @brief 鍒犻櫎涓嶅湪keys鍐呯殑queue
+ * @return 鍒犻櫎鐨勪釜鏁�
+ */
+int shm_mm_wrapper_remove_keys_exclude(int keys[], int length);
+
+
#ifdef __cplusplus
}
#endif
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index dc071cb..4898c81 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -8,6 +8,11 @@
return shm_socket_remove_keys(keys, length);
}
+size_t ShmModSocket::remove_keys_exclude(int keys[], size_t length) {
+ BusServerSocket::remove_subscripters(keys, length);
+ return shm_socket_remove_keys_exclude(keys, length);
+}
+
ShmModSocket::ShmModSocket() {
shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
bus_set = new std::set<int>;
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index 44a714b..3795cfb 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -37,6 +37,7 @@
public:
static size_t remove_keys(int keys[], size_t length);
+ static size_t remove_keys_exclude(int keys[], size_t length);
// bus header 缂栫爜涓虹綉缁滀紶杈撶殑瀛楄妭
static void * encode_bus_head(bus_head_t & bushead);
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index fc410bf..14274ca 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -65,7 +65,7 @@
return queue;
}
-
+//鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue
size_t shm_socket_remove_keys(int keys[], size_t length) {
hashtable_t *hashtable = mm_get_hashtable();
LockFreeQueue<shm_packet_t> *mqueue;
@@ -79,6 +79,38 @@
}
return count;
}
+
+
+// 鍒犻櫎涓嶅湪keys鍐呯殑queue
+size_t shm_socket_remove_keys_exclude(int keys[], size_t length) {
+ hashtable_t *hashtable = mm_get_hashtable();
+ std::set<int> *keyset = hashtable_keyset(hashtable);
+ std::set<int>::iterator keyItr;
+ LockFreeQueue<shm_packet_t> *mqueue;
+ bool found;
+ size_t count = 0;
+ for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
+ found = false;
+ for (size_t i = 0; i < length; i++) {
+ if (*keyItr == keys[i]) {
+ found = true;
+ break;
+ }
+ }
+ // 100鍐呯殑鏄痓us鍐呴儴鑷繁鐢ㄧ殑
+ if (!found && *keyItr > 100) {
+ // 閿�姣佸叡浜唴瀛樼殑queue
+ mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
+ delete mqueue;
+ hashtable_remove(hashtable, *keyItr);
+ count++;
+ }
+ }
+ delete keyset;
+ return count;
+}
+
+
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
int s, type;
@@ -493,11 +525,15 @@
if( sockt->queue != NULL)
goto LABEL_PUSH;
+
+ if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
+ return EBUS_EXCEED_LIMIT;
+ }
{
if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
err_exit(rv, "shm_sendto : pthread_mutex_lock");
-
+
if (sockt->queue == NULL) {
if (sockt->key == 0) {
sockt->key = hashtable_alloc_key(hashtable);
@@ -545,6 +581,10 @@
if( sockt->queue != NULL)
goto LABEL_POP;
+ if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
+ return EBUS_EXCEED_LIMIT;
+ }
+
{
if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
err_exit(rv, "shm_recvfrom : pthread_mutex_lock");
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index e7c0dda..516a343 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -46,6 +46,7 @@
typedef std::function<void(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void *user_data)> recvandsend_callback_fn;
size_t shm_socket_remove_keys(int keys[], size_t length);
+size_t shm_socket_remove_keys_exclude(int keys[], size_t length);
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type);
--
Gitblit v1.8.0