From 14c345b38d57fd814f217eb8465963a08ca79f7e Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 05 二月 2021 17:41:09 +0800
Subject: [PATCH] update

---
 src/socket/shm_mod_socket.h   |    1 
 src/shm/shm_mm_wrapper.h      |   13 +++
 src/queue/shm_queue.h         |   54 ++++++------
 src/socket/shm_socket.h       |    1 
 src/bus_error.h               |    1 
 src/shm/hashtable.h           |    9 ++
 src/shm/shm_mm_wrapper.cpp    |   53 +++++++++++++
 src/socket/shm_mod_socket.cpp |    5 +
 src/bus_error.cpp             |    3 
 src/shm/hashtable.cpp         |   33 ++++---
 src/socket/shm_socket.cpp     |   44 ++++++++++
 11 files changed, 173 insertions(+), 44 deletions(-)

diff --git a/src/bus_error.cpp b/src/bus_error.cpp
index a9348e0..5752cfd 100644
--- a/src/bus_error.cpp
+++ b/src/bus_error.cpp
@@ -19,7 +19,8 @@
   "Network fault",
   "Send to self error",
   "Receive from wrong end",
-  "Service stoped"
+  "Service stoped",
+  "Exceed resource limit"
 
 };
 
diff --git a/src/bus_error.h b/src/bus_error.h
index fa9f352..769aa36 100644
--- a/src/bus_error.h
+++ b/src/bus_error.h
@@ -12,6 +12,7 @@
 #define EBUS_SENDTO_SELF 505
 #define EBUS_RECVFROM_WRONG_END 506
 #define EBUS_STOPED 507
+#define EBUS_EXCEED_LIMIT 508
 
 extern int bus_errno;
 
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 3a7750e..24a4dfc 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -61,33 +61,33 @@
 };
 
 // @deprecate
-template <typename ELEM_T>
-size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
-  hashtable_t *hashtable = mm_get_hashtable();
-  std::set<int> *keyset = hashtable_keyset(hashtable);
-  std::set<int>::iterator keyItr;
-  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
-  bool found;
-  size_t count = 0;
-  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
-    found = false;
-    for (size_t i = 0; i < length; i++) {
-      if (*keyItr == keys[i]) {
-        found = true;
-        break;
-      }
-    }
-    if (!found) {
-      // 閿�姣佸叡浜唴瀛樼殑queue
-      mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
-      delete mqueue;
-      hashtable_remove(hashtable, *keyItr);
-      count++;
-    }
-  }
-  delete keyset;
-  return count;
-}
+// template <typename ELEM_T>
+// size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
+//   hashtable_t *hashtable = mm_get_hashtable();
+//   std::set<int> *keyset = hashtable_keyset(hashtable);
+//   std::set<int>::iterator keyItr;
+//   LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
+//   bool found;
+//   size_t count = 0;
+//   for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
+//     found = false;
+//     for (size_t i = 0; i < length; i++) {
+//       if (*keyItr == keys[i]) {
+//         found = true;
+//         break;
+//       }
+//     }
+//     if (!found && *keyItr > 100) {
+//       // 閿�姣佸叡浜唴瀛樼殑queue
+//       mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
+//       delete mqueue;
+//       hashtable_remove(hashtable, *keyItr);
+//       count++;
+//     }
+//   }
+//   delete keyset;
+//   return count;
+// }
 
 
 
diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp
index 7a5faf4..e435172 100755
--- a/src/shm/hashtable.cpp
+++ b/src/shm/hashtable.cpp
@@ -5,6 +5,9 @@
 #include "logger_factory.h"
 #include <set>
 #include <functional>
+#include <limits.h>
+
+
 
 typedef struct tailq_entry_t
 {
@@ -29,22 +32,12 @@
 
   memset(hashtable, 0, sizeof(hashtable_t));
   hashtable->mutex = svsem_get(IPC_PRIVATE, 1);
-  // hashtable->wlock = svsem_get(IPC_PRIVATE, 1);
-  // hashtable->cond = svsem_get(IPC_PRIVATE, 1);
-  // hashtable->readcnt = 0;
-
-  // FILE * semfile = fopen("./sem.txt", "w+");
-  // if(semfile == NULL) {
-  //   err_exit(errno, "fopen");
-  // }
-  // fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex);
-  // fclose(semfile);
+  hashtable->queueCount = 0;
+  hashtable->currentKey = START_KEY;
 }
 
 void hashtable_destroy(hashtable_t *hashtable) {
   svsem_remove( hashtable->mutex);
-  // svsem_remove( hashtable->wlock);
-  // svsem_remove( hashtable->cond);
 }
 
 
@@ -130,6 +123,7 @@
 
         /* mm_free the item as we don't need it anymore. */
         mm_free(item);
+        hashtable->queueCount--;
         svsem_post(hashtable->mutex);
         return oldvalue;
       }
@@ -150,6 +144,7 @@
 
 void hashtable_put(hashtable_t *hashtable, int key, void *value) {
   _hashtable_put(hashtable, key, value); 
+  hashtable->queueCount++;
 }
 
 bool hashtable_check_put(hashtable_t *hashtable, int key, void *value, bool overwrite) {
@@ -183,11 +178,18 @@
   return false;
 }
 
-
+int hashtable_get_queue_count(hashtable_t *hashtable) {
+  return hashtable->queueCount;
+}
 
 int hashtable_alloc_key(hashtable_t *hashtable) {
   int rv;
-  int key = START_KEY;
+  int key = hashtable->currentKey;
+ 
+  if( key == INT_MAX || key < START_KEY) {
+    key = START_KEY;
+  }
+
   rv = svsem_wait(hashtable->mutex);
   if(rv != 0) {
     LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n");
@@ -199,10 +201,12 @@
   // 鍗犵敤key
   _hashtable_put(hashtable, key, (void *)1);
 
+  hashtable->currentKey = key;
   rv = svsem_post(hashtable->mutex);
   if(rv != 0) {
     LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n");
   }
+
   return key;
 }
 
@@ -279,6 +283,7 @@
     hashtable->array[i] = NULL;
   }
 
+  hashtable->queueCount = 0;
   if((rv = svsem_post(hashtable->mutex)) != 0) {
     LoggerFactory::getLogger()->error(errno, "hashtable_removeall\n");
   }
diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h
index 47b715b..c19e899 100755
--- a/src/shm/hashtable.h
+++ b/src/shm/hashtable.h
@@ -7,13 +7,20 @@
 
 #define MAPSIZE 1024
 
+// 鍒涘缓Queue鏁伴噺鐨勪笂闄�
+#define QUEUE_COUNT_LIMIT 300
+
 typedef struct hashtable_t
 {
  struct tailq_header_t* array[MAPSIZE];
  int mutex;
+ int queueCount;
+ int currentKey; // 褰撳墠鍒嗛厤鐨刱ey
  // int wlock;
  // int cond;
  // size_t readcnt;
+
+ 
 
 } hashtable_t;
 typedef void (*hashtable_foreach_cb)(int key, void *value);
@@ -29,6 +36,8 @@
 
 int hashtable_lock(hashtable_t *hashtable);
 int hashtable_unlock(hashtable_t *hashtable);
+
+int hashtable_get_queue_count(hashtable_t *hashtable) ;
 /** 
  * 閬嶅巻hash_table
  * @demo 
diff --git a/src/shm/shm_mm_wrapper.cpp b/src/shm/shm_mm_wrapper.cpp
index d62602f..92350c8 100644
--- a/src/shm/shm_mm_wrapper.cpp
+++ b/src/shm/shm_mm_wrapper.cpp
@@ -1,6 +1,8 @@
 #include "shm_mm_wrapper.h"
 #include "mem_pool.h"
 #include "hashtable.h"
+#include "lock_free_queue.h"
+#include "shm_socket.h"
 
 void shm_mm_wrapper_init(int size) {
 	mem_pool_init(size);
@@ -13,3 +15,54 @@
 int shm_mm_wrapper_alloc_key() {
 	return mm_alloc_key();
 }
+
+
+
+//鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue
+int shm_mm_wrapper_remove_keys(int keys[], int length) {
+  hashtable_t *hashtable = mm_get_hashtable();
+  LockFreeQueue<shm_packet_t> *mqueue;
+  int count = 0;
+  for(int i = 0; i< length; i++) {
+    // 閿�姣佸叡浜唴瀛樼殑queue
+    mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
+    if(mqueue == NULL) {
+    	continue;
+    }
+    delete mqueue;
+    hashtable_remove(hashtable, keys[i]);
+    count++;
+  }
+  return count;
+}
+
+ 
+// 鍒犻櫎涓嶅湪keys鍐呯殑queue 
+int shm_mm_wrapper_remove_keys_exclude(int keys[], int length) {
+  hashtable_t *hashtable = mm_get_hashtable();
+  std::set<int> *keyset = hashtable_keyset(hashtable);
+  std::set<int>::iterator keyItr;
+  LockFreeQueue<shm_packet_t> *mqueue;
+  bool found;
+  int count = 0;
+  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
+    found = false;
+    for (size_t i = 0; i < length; i++) {
+      if (*keyItr == keys[i]) {
+        found = true;
+        break;
+      }
+    }
+    // 100鍐呯殑鏄痓us鍐呴儴鑷繁鐢ㄧ殑
+    if (!found && *keyItr > 100) {
+      // 閿�姣佸叡浜唴瀛樼殑queue
+      mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
+
+      delete mqueue;
+      hashtable_remove(hashtable, *keyItr);
+      count++;
+    }
+  }
+  delete keyset;
+  return count;
+}
diff --git a/src/shm/shm_mm_wrapper.h b/src/shm/shm_mm_wrapper.h
index 8ad3cf5..82e1b55 100644
--- a/src/shm/shm_mm_wrapper.h
+++ b/src/shm/shm_mm_wrapper.h
@@ -33,6 +33,19 @@
 int shm_mm_wrapper_alloc_key();
 
 
+/**
+ * @brief 鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue
+ * @return 鍒犻櫎鐨勪釜鏁�
+ */
+int shm_mm_wrapper_remove_keys(int keys[], int length);
+
+/**
+ * @brief 鍒犻櫎涓嶅湪keys鍐呯殑queue 
+ * @return 鍒犻櫎鐨勪釜鏁�
+ */
+int shm_mm_wrapper_remove_keys_exclude(int keys[], int length);
+
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index dc071cb..4898c81 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -8,6 +8,11 @@
 	return shm_socket_remove_keys(keys, length);
 }
 
+size_t ShmModSocket::remove_keys_exclude(int keys[], size_t length) {
+	BusServerSocket::remove_subscripters(keys, length);
+	return shm_socket_remove_keys_exclude(keys, length);
+}
+
 ShmModSocket::ShmModSocket() {
 	shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
 	bus_set = new std::set<int>;
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index 44a714b..3795cfb 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -37,6 +37,7 @@
 
 public:
 	static size_t remove_keys(int keys[], size_t length);
+	static size_t remove_keys_exclude(int keys[], size_t length);
 
   // bus header 缂栫爜涓虹綉缁滀紶杈撶殑瀛楄妭
   static void * encode_bus_head(bus_head_t & bushead);
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index fc410bf..14274ca 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -65,7 +65,7 @@
   return queue;
 }
 
-
+//鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue
 size_t shm_socket_remove_keys(int keys[], size_t length) {
   hashtable_t *hashtable = mm_get_hashtable();
   LockFreeQueue<shm_packet_t> *mqueue;
@@ -79,6 +79,38 @@
   }
   return count;
 }
+
+ 
+// 鍒犻櫎涓嶅湪keys鍐呯殑queue 
+size_t shm_socket_remove_keys_exclude(int keys[], size_t length) {
+  hashtable_t *hashtable = mm_get_hashtable();
+  std::set<int> *keyset = hashtable_keyset(hashtable);
+  std::set<int>::iterator keyItr;
+  LockFreeQueue<shm_packet_t> *mqueue;
+  bool found;
+  size_t count = 0;
+  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
+    found = false;
+    for (size_t i = 0; i < length; i++) {
+      if (*keyItr == keys[i]) {
+        found = true;
+        break;
+      }
+    }
+    // 100鍐呯殑鏄痓us鍐呴儴鑷繁鐢ㄧ殑
+    if (!found && *keyItr > 100) {
+      // 閿�姣佸叡浜唴瀛樼殑queue
+      mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
+      delete mqueue;
+      hashtable_remove(hashtable, *keyItr);
+      count++;
+    }
+  }
+  delete keyset;
+  return count;
+}
+
+
 
 shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
   int s, type;
@@ -493,11 +525,15 @@
 
   if( sockt->queue != NULL) 
     goto LABEL_PUSH;
+
+  if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
+    return EBUS_EXCEED_LIMIT;
+  }
  
   {
     if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
     err_exit(rv, "shm_sendto : pthread_mutex_lock");
-
+    
     if (sockt->queue == NULL) {
       if (sockt->key == 0) {
         sockt->key = hashtable_alloc_key(hashtable);
@@ -545,6 +581,10 @@
   if( sockt->queue != NULL) 
     goto LABEL_POP;
 
+  if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
+    return EBUS_EXCEED_LIMIT;
+  }
+
   {
     if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
       err_exit(rv, "shm_recvfrom : pthread_mutex_lock");
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index e7c0dda..516a343 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -46,6 +46,7 @@
 typedef std::function<void(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void *user_data)> recvandsend_callback_fn;
 
 size_t shm_socket_remove_keys(int keys[], size_t length);
+size_t shm_socket_remove_keys_exclude(int keys[], size_t length);
 
 shm_socket_t *shm_open_socket(shm_socket_type_t socket_type);
 

--
Gitblit v1.8.0