From e0aea3742aed09a0a9ed384ccd7db203b6efc650 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期六, 20 二月 2021 14:43:52 +0800
Subject: [PATCH] update

---
 src/key_def.h                           |    3 
 src/shm/shm_mm_wrapper.h                |    4 
 src/queue/lock_free_queue.h             |   36 ++++
 test_net_socket/test_net_mod_socket.cpp |    8 +
 src/shm/shm_mm_wrapper.cpp              |   34 ++++
 src/socket/shm_socket.cpp               |   76 +++++++---
 /dev/null                               |   63 ---------
 src/shm/shm_mm.cpp                      |   60 ++++++++
 src/queue/shm_queue.h                   |   31 ----
 src/socket/bus_server_socket.cpp        |   10 
 src/shm/shm_mm.h                        |   37 +++++
 src/shm/shm_allocator.h                 |    4 
 src/socket/shm_mod_socket.cpp           |    9 -
 13 files changed, 235 insertions(+), 140 deletions(-)

diff --git a/src/key_def.h b/src/key_def.h
index 4f0e1c0..904b78f 100644
--- a/src/key_def.h
+++ b/src/key_def.h
@@ -2,6 +2,9 @@
 #define _KEY_DEF_H_
 
 #define SHM_BUS_MAP_KEY 1  
+
+
+#define SHM_QUEUE_ST_KEY 3
 // BUS key
 #define SHM_BUS_KEY 8
 // 缃戠粶浠g悊key
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 425d9f8..d66ee8c 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -17,6 +17,11 @@
 // default Queue size
 #define LOCK_FREE_Q_DEFAULT_SIZE 16
 
+
+#define LOCK_FREE_Q_ST_OPENED 0
+
+#define LOCK_FREE_Q_ST_CLOSED 1
+
 // static Logger *logger = LoggerFactory::getLogger();
 // define this macro if calls to "size" must return the real size of the
 // queue. If it is undefined  that function will try to take a snapshot of
@@ -84,10 +89,10 @@
   sem_t items;
 
   time_t createTime;
+  time_t closeTime;
+  int status;
 
 public:
-  // sem_t mutex;
-
 
   LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
 
@@ -95,6 +100,8 @@
   /// Note it is not virtual since it is not expected to inherit from this
   /// template
   ~LockFreeQueue();
+
+  inline void  close();
 
   // std::atomic_uint reference;
   /// @brief constructor of the class
@@ -120,8 +127,18 @@
 
   inline ELEM_T &operator[](unsigned i);
 
+  
+
   time_t getCreateTime() {
     return createTime;
+  }
+
+  time_t getCloseTime() {
+    return closeTime;
+  }
+
+  int getStatus() {
+    return status;
   }
 
   /// @brief push an element at the tail of the queue
@@ -166,7 +183,18 @@
     err_exit(errno, "LockFreeQueue sem_init");
   
   createTime = time(NULL);
+  status = LOCK_FREE_Q_ST_OPENED;
 
+}
+
+
+template<
+  typename ELEM_T,
+  typename Allocator,
+  template<typename T, typename AT> class Q_TYPE>
+inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() {
+  status = LOCK_FREE_Q_ST_CLOSED;
+  closeTime = time(NULL); 
 }
 
 
@@ -182,9 +210,7 @@
   if (sem_destroy(&items) == -1) {
     err_exit(errno, "LockFreeQueue sem_destroy");
   }
-  // if (sem_destroy(&mutex) == -1) {
-  //   err_exit(errno, "LockFreeQueue sem_destroy");
-  // }
+  
 }
 
 template<
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 24a4dfc..7893485 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -45,8 +45,6 @@
 
   ELEM_T &operator[](unsigned i);
 
- // @deprecate
-  static size_t remove_queues_exclude(int keys[], size_t length);
 
 private:
 protected:
@@ -60,34 +58,7 @@
   SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src);
 };
 
-// @deprecate
-// template <typename ELEM_T>
-// size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
-//   hashtable_t *hashtable = mm_get_hashtable();
-//   std::set<int> *keyset = hashtable_keyset(hashtable);
-//   std::set<int>::iterator keyItr;
-//   LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
-//   bool found;
-//   size_t count = 0;
-//   for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
-//     found = false;
-//     for (size_t i = 0; i < length; i++) {
-//       if (*keyItr == keys[i]) {
-//         found = true;
-//         break;
-//       }
-//     }
-//     if (!found && *keyItr > 100) {
-//       // 閿�姣佸叡浜唴瀛樼殑queue
-//       mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
-//       delete mqueue;
-//       hashtable_remove(hashtable, *keyItr);
-//       count++;
-//     }
-//   }
-//   delete keyset;
-//   return count;
-// }
+
 
 
 
diff --git a/src/shm/mem_pool.h b/src/shm/mem_pool.h
deleted file mode 100644
index 5a698ec..0000000
--- a/src/shm/mem_pool.h
+++ /dev/null
@@ -1,63 +0,0 @@
-#ifndef _MEM_POOL_H_
-#define _MEM_POOL_H_  
-#include "mm.h"
-#include "sem_util.h"
-#define MEM_POOL_COND_KEY 0x8801
-
-
-// static int mem_pool_mutex  = SemUtil::get(MEM_POOL_COND_KEY, 1);
-
-static inline void mem_pool_init(size_t heap_size) {
-	mm_init(heap_size);
-}
-
-static inline void mem_pool_destroy(void) {
-	mm_destroy();
-	
-}
-
-static inline void *mem_pool_malloc (size_t size) {
-	return  mm_malloc(size);
-}
-
-
-static inline void mem_pool_free (void *ptr) {
-	mm_free(ptr);
-}
-
-
-template <typename T>
-static inline  T* mem_pool_attach(int key) {
-	void *ptr;
-	// T* tptr;
-	hashtable_t *hashtable = mm_get_hashtable();
-  ptr = hashtable_get(hashtable, key);
-// printf("mem_pool_malloc_by_key  malloc before %d, %p\n", key, ptr);
-  if(ptr == NULL || ptr == (void *)1 ) {
-    ptr = mm_malloc(sizeof(T));
-    hashtable_put(hashtable, key, ptr);
-    new(ptr) T;
-// printf("mem_pool_malloc_by_key  use new %d, %p\n", key, ptr);
-  }
-  return (T*)ptr; 
-}
-
-static inline void mem_pool_free_by_key(int key) {
-	return mm_free_by_key(key);
-}
-
-
-static inline void *mem_pool_realloc (void *ptr, size_t size) {
-	return mm_realloc(ptr, size);
-}
-
-static inline int mem_pool_alloc_key() {
-	 
-	return mm_alloc_key();
-}
- 
-
-// extern int mm_checkheap(int verbose);
-
-
-#endif
\ No newline at end of file
diff --git a/src/shm/shm_allocator.h b/src/shm/shm_allocator.h
index 084a678..d14708f 100644
--- a/src/shm/shm_allocator.h
+++ b/src/shm/shm_allocator.h
@@ -67,12 +67,12 @@
   public:
     static void *allocate (size_t size) {
       return mm_malloc(size);
-      // return mem_pool_malloc(size);
+      // return shm_mm_malloc(size);
     }
 
     static void deallocate (void *ptr) {
       return mm_free(ptr);
-      // return mem_pool_free(ptr);
+      // return shm_mm_free(ptr);
     }
 };
 
diff --git a/src/shm/shm_mm.cpp b/src/shm/shm_mm.cpp
new file mode 100644
index 0000000..6341086
--- /dev/null
+++ b/src/shm/shm_mm.cpp
@@ -0,0 +1,60 @@
+#include "shm_mm.h"
+#include "mm.h"
+#include "sem_util.h"
+
+
+void shm_mm_init(size_t heap_size) {
+	mm_init(heap_size);
+	shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY);
+}
+
+void shm_mm_destroy(void) {
+	mm_destroy();
+	
+}
+
+void *shm_mm_malloc (size_t size) {
+	return  mm_malloc(size);
+}
+
+
+void shm_mm_free (void *ptr) {
+	mm_free(ptr);
+}
+
+
+template <typename T>
+ T* shm_mm_attach(int key) {
+	void *ptr;
+	// T* tptr;
+	hashtable_t *hashtable = mm_get_hashtable();
+  ptr = hashtable_get(hashtable, key);
+// printf("shm_mm_malloc_by_key  malloc before %d, %p\n", key, ptr);
+  if(ptr == NULL || ptr == (void *)1 ) {
+    ptr = mm_malloc(sizeof(T));
+    hashtable_put(hashtable, key, ptr);
+    new(ptr) T;
+// printf("shm_mm_malloc_by_key  use new %d, %p\n", key, ptr);
+  }
+  return (T*)ptr; 
+}
+
+void shm_mm_free_by_key(int key) {
+	return mm_free_by_key(key);
+}
+
+
+void *shm_mm_realloc (void *ptr, size_t size) {
+	return mm_realloc(ptr, size);
+}
+
+int shm_mm_alloc_key() {
+	 
+	return mm_alloc_key();
+}
+ 
+
+// extern int mm_checkheap(int verbose);
+
+
+#endif
\ No newline at end of file
diff --git a/src/shm/shm_mm.h b/src/shm/shm_mm.h
new file mode 100644
index 0000000..db1bea9
--- /dev/null
+++ b/src/shm/shm_mm.h
@@ -0,0 +1,37 @@
+#ifndef __SHM_MM_H__
+#define __SHM_MM_H__
+
+#define SHM_QUEUE_ST_OPENED 0
+
+#define SHM_QUEUE_ST_CLOSED 1
+
+struct shm_queue_status_t {
+
+  int status;
+  time_t createTime;
+  time_t closeTime;
+};
+
+typedef std::map<int, shm_queue_status_t, std::less<int>, SHM_STL_Allocator<std::pair<const int, shm_queue_status_t> > > ShmQueueStMap;
+ 
+
+void shm_mm_init(size_t heap_size) ;
+
+void shm_mm_destroy(void) ;
+
+void *shm_mm_malloc (size_t size);
+
+void shm_mm_free (void *ptr);
+
+
+template <typename T>
+T* shm_mm_attach(int key) ;
+
+void shm_mm_free_by_key(int key) ;
+
+
+void *shm_mm_realloc (void *ptr, size_t size);
+
+int shm_mm_alloc_key();
+
+#endif 
\ No newline at end of file
diff --git a/src/shm/shm_mm_wrapper.cpp b/src/shm/shm_mm_wrapper.cpp
index 59487c6..f726f8a 100644
--- a/src/shm/shm_mm_wrapper.cpp
+++ b/src/shm/shm_mm_wrapper.cpp
@@ -5,18 +5,48 @@
 #include "shm_socket.h"
 
 #define BUFFER_TIME 10
+
+
 void shm_mm_wrapper_init(int size) {
-	mem_pool_init(size);
+	shm_mm_init(size);
+
 }
 
 void shm_mm_wrapper_destroy() {
-	mem_pool_destroy();
+	shm_mm_destroy();
 }
 
 int shm_mm_wrapper_alloc_key() {
 	return mm_alloc_key();
 }
 
+
+/**
+ * 鍥炴敹鍋囧垹闄ょ殑key
+ */
+int shm_mm_wrapper_start_resycle() {
+  ShmQueueStMap * shmQueueStMap =  shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY);
+  hashtable_t *hashtable = mm_get_hashtable();
+  LockFreeQueue<shm_packet_t> *mqueue;
+  while(true) {
+    for(auto it = shmQueueStMap->begin(); it != shmQueueStMap->end(); ++it ) {
+      if(it->second.status = SHM_QUEUE_ST_CLOSED && difftime(time(NULL), it->second.closeTime) > 2 ) {
+        mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
+        if(mqueue != NULL) {
+          delete mqueue;
+          hashtable_remove(hashtable, it->first);
+          printf("reove queue %d\n", it->first);
+          // 涓嶈兘 erase ,鍚﹀垯浼氬嚭鐜板杩涚▼涔嬮棿鐨勫悓姝ラ棶棰橈紝 鑰岃繖姝f槸杩欓噷瑕佽В鍐崇殑闂
+          // it = shmQueueStMap->erase(it);
+          // continue;
+        }
+      }
+    }
+
+    sleep(1);
+  }
+}
+
 //鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue
 int shm_mm_wrapper_remove_keys(int keys[], int length) {
   hashtable_t *hashtable = mm_get_hashtable();
diff --git a/src/shm/shm_mm_wrapper.h b/src/shm/shm_mm_wrapper.h
index 82e1b55..b39fdc3 100644
--- a/src/shm/shm_mm_wrapper.h
+++ b/src/shm/shm_mm_wrapper.h
@@ -5,8 +5,8 @@
  *
  */
 
-#ifndef __SHM_MM_H__
-#define __SHM_MM_H__
+#ifndef __SHM_MM_WRAPPER_H__
+#define __SHM_MM_WRAPPER_H__
 
 #ifdef __cplusplus
 extern "C" {
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 6aa6a95..657941b 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -6,7 +6,7 @@
 static Logger *logger = LoggerFactory::getLogger();
 
 void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
-	SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
+	SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
 	SHMKeySet *subscripter_set;
 	SHMKeySet::iterator set_iter;
 	SHMTopicSubMap::iterator map_iter;
@@ -29,7 +29,7 @@
 	int key;
 	for(int i = 0; i < length; i++) {
 		key = keys[i];
-		SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
+		SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
 		SHMKeySet *subscripter_set;
 		SHMKeySet::iterator set_iter;
 		SHMTopicSubMap::iterator map_iter;
@@ -79,9 +79,9 @@
  * 鍚姩bus
  * 
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
-*/
+ */
 int  BusServerSocket::start(){
-	topic_sub_map =	mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
+	topic_sub_map =	shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
  
 	_run_proxy_();
 	return 0;
@@ -124,7 +124,7 @@
 
 		}
 		topic_sub_map->clear();
-		mem_pool_free_by_key(SHM_BUS_MAP_KEY);
+		shm_mm_free_by_key(SHM_BUS_MAP_KEY);
 	}
 	shm_socket_close(shm_socket);
 	logger->debug("BusServerSocket destory 3");
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 15d4072..466d0b5 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -3,15 +3,6 @@
 static Logger *logger = LoggerFactory::getLogger();
 
 
-// size_t ShmModSocket::remove_keys(int keys[], size_t length) {
-// 	BusServerSocket::remove_subscripters(keys, length);
-// 	return shm_socket_remove_keys(keys, length);
-// }
-
-// size_t ShmModSocket::remove_keys_exclude(int keys[], size_t length) {
-// 	BusServerSocket::remove_subscripters(keys, length);
-// 	return shm_socket_remove_keys_exclude(keys, length);
-// }
 
 ShmModSocket::ShmModSocket() {
 	shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 0d82be0..3366491 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -5,10 +5,11 @@
 #include <cassert>
 #include "bus_error.h"
 #include "sole.h"
+#include "shm_mm.h"
 
 static Logger *logger = LoggerFactory::getLogger();
 
-
+ShmQueueStMap * shmQueueStMap ;
 
 static void print_msg(char *head, shm_packet_t &msg) {
   // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type);
@@ -101,6 +102,9 @@
   if (s != 0)
     err_exit(s, "pthread_mutexattr_destroy");
 
+
+  shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY);
+
   return sockt;
 }
 
@@ -109,10 +113,10 @@
   
   int rv;
   logger->debug("shm_socket_close\n");
-  if(sockt->queue != NULL) {
-    delete sockt->queue;
-    sockt->queue = NULL;
-  }
+  // if(sockt->queue != NULL) {
+  //   delete sockt->queue;
+  //   sockt->queue = NULL;
+  // }
 
   rv =  pthread_mutex_destroy(&(sockt->mutex) );
   if(rv != 0) {
@@ -120,6 +124,12 @@
   }
 
   free(sockt);
+
+  auto it =  shmQueueStMap.find(key);
+  if(it != shmQueueStMap.end()) {
+    it->second.status = SHM_QUEUE_ST_CLOSED
+    it->second.closeTime = time(NULL);
+  }
   return 0;
 }
 
@@ -523,6 +533,7 @@
                const int key, const struct timespec *timeout, const int flag) {
 
   int rv;
+  shm_queue_status_t stRecord;
   hashtable_t *hashtable = mm_get_hashtable();
 
   if( sockt->queue != NULL) 
@@ -545,6 +556,12 @@
         logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
         return EBUS_KEY_INUSED;
       }
+
+      // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened
+      stRecord.status = SHM_QUEUE_ST_OPENED;
+      stRecord.createTime = time(NULL);
+      shmQueueStMap.insert({sockt->key, stRecord});
+      
     }
 
     if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
@@ -559,24 +576,34 @@
     return EBUS_SENDTO_SELF;
   }
 
-  LockFreeQueue<shm_packet_t> *remoteQueue;
-  if ((remoteQueue = shm_socket_attach_queue(key)) == NULL) {
-    bus_errno = EBUS_CLOSED;
-    logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno));
-    return EBUS_CLOSED;
+  // 妫�鏌ey鏍囪鐨勭姸鎬�
+  auto it =  shmQueueStMap.find(key);
+  if(it != shmQueueStMap.end()) {
+    if(it->second.status == SHM_QUEUE_ST_CLOSED) {
+      // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨�
+      goto ERR_CLOSED;
+    }
   }
 
-  
+  LockFreeQueue<shm_packet_t> *remoteQueue = shm_socket_attach_queue(key);
+
+  if (remoteQueue == NULL ) {
+    goto ERR_CLOSED;
+  }
 
   rv = remoteQueue->push(*sendpak, timeout, flag);
-
   return rv;
+
+ERR_CLOSED:
+  logger->error("sendto key %d failed, %s", key, bus_strerror(EBUS_CLOSED));
+  return EBUS_CLOSED;
+
 }
 
 // 鐭繛鎺ユ柟寮忔帴鍙�
 static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak ,  const struct timespec *timeout,  int flag) {
   int rv;
-  
+  shm_queue_status_t stRecord;
   hashtable_t *hashtable = mm_get_hashtable();
   shm_packet_t recvpak;
 
@@ -601,6 +628,10 @@
       return EBUS_KEY_INUSED;
     }
     
+    // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened
+    stRecord.status = SHM_QUEUE_ST_OPENED;
+    stRecord.createTime = time(NULL);
+    shmQueueStMap.insert({sockt->key, stRecord});
     
     if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
       err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
@@ -609,8 +640,15 @@
   
 LABEL_POP:
 
- // 
-  // printf("%p start recv.....\n", sockt);
+  // 妫�鏌ey鏍囪鐨勭姸鎬�
+  // auto shmQueueMapIter =  shmQueueStMap.find(sockt->key);
+  // if(shmQueueMapIter != shmQueueStMap.end()) {
+  //   stRecord = shmQueueMapIter->second;
+  //   if(stRecord.status = SHM_QUEUE_ST_CLOSED) {
+  //     // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨�
+  //     goto ERR_CLOSED;
+  //   }
+  // }
  
   rv = sockt->queue->pop(recvpak, timeout, flag);
   if(rv != 0) 
@@ -623,10 +661,4 @@
   *_recvpak = recvpak;
   return rv;
 }
-// int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf,
-//                     const int send_size, const int send_key, void **recv_buf,
-//                     int *recv_size,  const struct timespec *timeout,  int flags) {
-
-//   struct timespec tm = {10, 0};
-//   return  _shm_sendandrecv_thread_local(sockt, send_buf, send_size, send_key,recv_buf, recv_size, &tm, flags);
-// }
+ 
diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp
index 05827a1..56115f8 100644
--- a/test_net_socket/test_net_mod_socket.cpp
+++ b/test_net_socket/test_net_mod_socket.cpp
@@ -75,6 +75,11 @@
 	}
 }
 
+void start_resycle() {
+  shm_mm_wrapper_start_resycle();
+}
+
+
 // 鎵撳嵃鎺ュ彈鍒扮殑璁㈤槄娑堟伅
 void *print_sub_msg(void *sockt) {
   pthread_detach(pthread_self());
@@ -602,6 +607,9 @@
      
     test_net_pub(opt.publist);
   }
+  else if (strcmp("start_resycle", opt.fun) == 0) {
+    start_resycle();
+  }
   
   else {
     usage(argv[0]);

--
Gitblit v1.8.0