From 554529bb69cd610e83db2c9a80b4f36f5225d80f Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 27 七月 2020 17:56:34 +0800
Subject: [PATCH] restart bus

---
 demo/dgram_mod_req_rep                |    0 
 src/socket/include/shm_socket.h       |    4 +
 src/libshm_queue.a                    |    0 
 src/socket/shm_socket.c               |   17 +++-
 src/queue/include/shm_mm.h            |    3 
 src/queue/include/shm_queue.h         |   25 ++++--
 src/socket/dgram_mod_socket.c         |   56 +++++++++----
 test_socket/dgram_mod_bus             |    0 
 test_socket/dgram_mod_survey          |    0 
 src/queue/shm_mm.c                    |    1 
 src/queue/include/mem_pool.h          |   24 +++++
 src/queue/include/shm_allocator.h     |    8 +
 test_socket/dgram_mod_bus.c           |   37 +++++++-
 src/queue/hashtable.c                 |   26 +++--
 test_socket/dgram_mod_req_rep         |    0 
 demo/dgram_mod_survey                 |    0 
 src/socket/include/dgram_mod_socket.h |    2 
 17 files changed, 149 insertions(+), 54 deletions(-)

diff --git a/demo/dgram_mod_req_rep b/demo/dgram_mod_req_rep
index 87ad5b5..6b03fdf 100755
--- a/demo/dgram_mod_req_rep
+++ b/demo/dgram_mod_req_rep
Binary files differ
diff --git a/demo/dgram_mod_survey b/demo/dgram_mod_survey
index 7fc3de3..a3765fc 100755
--- a/demo/dgram_mod_survey
+++ b/demo/dgram_mod_survey
Binary files differ
diff --git a/src/libshm_queue.a b/src/libshm_queue.a
index bc22bb2..56e099f 100644
--- a/src/libshm_queue.a
+++ b/src/libshm_queue.a
Binary files differ
diff --git a/src/queue/hashtable.c b/src/queue/hashtable.c
index 21700ac..1fa6266 100755
--- a/src/queue/hashtable.c
+++ b/src/queue/hashtable.c
@@ -177,18 +177,7 @@
 }
 
 
-int hashtable_alloc_key(hashtable_t *hashtable) {
-  int key = START_KEY;
-  SemUtil::dec(hashtable->wlock);
 
-  while(_hashtable_get(hashtable, key) != NULL) {
-    key++;
-  }
-
-  _hashtable_put(hashtable, key, (void *)1);
-  SemUtil::inc(hashtable->wlock);
-  return key;
-}
 
 void *hashtable_get(hashtable_t *hashtable, int key) {
    SemUtil::dec(hashtable->mutex);
@@ -251,6 +240,19 @@
   }
 }
 
+int hashtable_alloc_key(hashtable_t *hashtable) {
+  int key = START_KEY;
+  SemUtil::dec(hashtable->wlock);
+
+  while(_hashtable_get(hashtable, key) != NULL) {
+    key++;
+  }
+
+  _hashtable_put(hashtable, key, (void *)1);
+  SemUtil::inc(hashtable->wlock);
+  return key;
+}
+
 std::set<int> * hashtable_keyset(hashtable_t *hashtable) {
   std::set<int> *keyset = new std::set<int>;
   tailq_entry_t *item;
@@ -267,3 +269,5 @@
   }
   return keyset;
 }
+
+
diff --git a/src/queue/include/mem_pool.h b/src/queue/include/mem_pool.h
index 17a7c5c..d5a4110 100644
--- a/src/queue/include/mem_pool.h
+++ b/src/queue/include/mem_pool.h
@@ -34,6 +34,22 @@
 	return ptr;
 }
 
+template <typename T>
+static inline  T* mem_pool_attach(int key) {
+	void *ptr;
+	// T* tptr;
+	hashtable_t *hashtable = mm_get_hashtable();
+  ptr = hashtable_get(hashtable, key);
+printf("mem_pool_malloc_by_key  malloc before %d, %p\n", key, ptr);
+  if(ptr == NULL || ptr == (void *)1 ) {
+    ptr = mm_malloc(sizeof(T));
+    hashtable_put(hashtable, key, ptr);
+    new(ptr) T;
+printf("mem_pool_malloc_by_key  use new %d, %p\n", key, ptr);
+  }
+  return (T*)ptr; 
+}
+
 static inline void mem_pool_free (void *ptr) {
 	mm_free(ptr);
 	// notify malloc
@@ -45,10 +61,12 @@
 	return mm_realloc(ptr, size);
 }
 
-static inline hashtable_t * mem_pool_get_hashtable() {
-	return mm_get_hashtable();
-
+static inline int mem_pool_alloc_key() {
+	hashtable_t *hashtable = mm_get_hashtable();
+	return hashtable_alloc_key(hashtable);
 }
+ 
+
 // extern int mm_checkheap(int verbose);
 
 
diff --git a/src/queue/include/shm_allocator.h b/src/queue/include/shm_allocator.h
index ae94a9c..084a678 100644
--- a/src/queue/include/shm_allocator.h
+++ b/src/queue/include/shm_allocator.h
@@ -66,11 +66,13 @@
 class SHM_Allocator {
   public:
     static void *allocate (size_t size) {
-      return mem_pool_malloc(size);
+      return mm_malloc(size);
+      // return mem_pool_malloc(size);
     }
 
     static void deallocate (void *ptr) {
-      return mem_pool_free(ptr);
+      return mm_free(ptr);
+      // return mem_pool_free(ptr);
     }
 };
 
@@ -93,6 +95,6 @@
 
 
 
-typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > shmstring;
+typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
 
 #endif
\ No newline at end of file
diff --git a/src/queue/include/shm_mm.h b/src/queue/include/shm_mm.h
index b32568e..6f28f13 100644
--- a/src/queue/include/shm_mm.h
+++ b/src/queue/include/shm_mm.h
@@ -18,6 +18,9 @@
  */
 void shm_destroy();
 
+void* shm_malloc_by_key(int key, int size);
+
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/src/queue/include/shm_queue.h b/src/queue/include/shm_queue.h
index 99f4802..e64f15a 100644
--- a/src/queue/include/shm_queue.h
+++ b/src/queue/include/shm_queue.h
@@ -38,7 +38,8 @@
 
   inline ELEM_T &operator[](unsigned i);
 
-  static void remove_queues_exclude(int *keys, size_t length);
+  static void remove_queues_exclude(int keys[], size_t length);
+  static void remove_queues_include(int keys[], size_t length);
 
 private:
 protected:
@@ -52,7 +53,7 @@
 };
 
 template <typename ELEM_T>
-void SHMQueue<ELEM_T>::remove_queues_exclude(int *keys, size_t length) {
+void SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
   hashtable_t *hashtable = mm_get_hashtable();
   std::set<int> *keyset = hashtable_keyset(hashtable);
   std::set<int>::iterator keyItr;
@@ -67,12 +68,21 @@
       }
     }
     if (!found) {
-      mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable,
-                                                                     *keyItr);
-      delete mqueue;
+      // mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
+      // delete mqueue;
+      hashtable_remove(hashtable, *keyItr);
     }
   }
   delete keyset;
+}
+
+
+template <typename ELEM_T>
+void SHMQueue<ELEM_T>::remove_queues_include(int keys[], size_t length) {
+  hashtable_t *hashtable = mm_get_hashtable();
+  for(int i = 0; i< length; i++) {
+    hashtable_remove(hashtable, keys[i]);
+  }
 }
 
 template <typename ELEM_T>
@@ -86,8 +96,7 @@
     hashtable_put(hashtable, key, (void *)queue);
   }
   queue->reference++;
-  LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d",
-                                   queue->reference.load());
+  LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load());
 }
 
 template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() {
@@ -96,7 +105,7 @@
   // LoggerFactory::getLogger().debug("SHMQueue destructor  reference===%d",
   // queue->reference.load());
   if (queue->reference.load() == 0) {
-    delete queue;
+   // delete queue;
     hashtable_t *hashtable = mm_get_hashtable();
     hashtable_remove(hashtable, KEY);
     // LoggerFactory::getLogger().debug("SHMQueue destructor delete queue\n");
diff --git a/src/queue/shm_mm.c b/src/queue/shm_mm.c
index 8b43316..6eeae1a 100644
--- a/src/queue/shm_mm.c
+++ b/src/queue/shm_mm.c
@@ -1,5 +1,6 @@
 #include "shm_mm.h"
 #include "mem_pool.h"
+#include "hashtable.h"
 
 void shm_init(int size) {
 	mem_pool_init(size);
diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c
index 62ae7ac..937d8f0 100644
--- a/src/socket/dgram_mod_socket.c
+++ b/src/socket/dgram_mod_socket.c
@@ -16,11 +16,15 @@
 
 static Logger logger = LoggerFactory::getLogger();
 
+//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
+typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
+typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<SHMString, SHMKeySet *> > > SHMTopicSubMap;
+
 typedef struct dgram_mod_socket_t {
   shm_socket_t *shm_socket;
   // pthread_t recv_thread;
   // <涓婚锛� 璁㈤槄鑰�>
-	std::map<std::string, std::set<int> *> *topic_sub_map;
+	SHMTopicSubMap *topic_sub_map;
 } dgram_mod_socket_t;
 
 static int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
@@ -36,9 +40,9 @@
 
 int dgram_mod_close_socket(void * _socket) {
 	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
-	std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map;
-	std::set<int> *subscripter_set;
-	std::map<std::string, std::set<int> *>::iterator map_iter;
+	SHMTopicSubMap *topic_sub_map = socket->topic_sub_map;
+	SHMKeySet *subscripter_set;
+	SHMTopicSubMap::iterator map_iter;
 
 	if(topic_sub_map != NULL) {
 		for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
@@ -57,6 +61,12 @@
 int dgram_mod_bind(void * _socket, int port){
 	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
 	return  shm_socket_bind(socket->shm_socket, port);
+}
+
+
+int dgram_mod_force_bind(void * _socket, int port) {
+	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+	return shm_socket_force_bind(socket->shm_socket, port);
 }
 
 int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port) {
@@ -96,7 +106,14 @@
 
 int  dgram_mod_start_bus(void * _socket) {
 	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
-	socket->topic_sub_map = new std::map<std::string, std::set<int> *>;
+printf("mem_pool_malloc_by_key before\n");
+	// void *map_ptr = mem_pool_malloc_by_key(1, sizeof(SHMTopicSubMap));
+	socket->topic_sub_map =	mem_pool_attach<SHMTopicSubMap>(1);
+printf("mem_pool_malloc_by_key after\n");
+
+	// socket->topic_sub_map = new(map_ptr) SHMTopicSubMap;
+
+	//socket->topic_sub_map = new SHMTopicSubMap;
 	run_pubsub_proxy(socket);
 	// pthread_t tid;
 	// pthread_create(&tid, NULL, run_accept_sub_request, _socket);
@@ -136,16 +153,17 @@
  * 澶勭悊璁㈤槄
 */
 void _proxy_sub(dgram_mod_socket_t *socket, char *topic, int port) {
-	std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map;
-	std::set<int> *subscripter_set;
+	SHMTopicSubMap *topic_sub_map = socket->topic_sub_map;
+	SHMKeySet *subscripter_set;
 
-	std::map<std::string, std::set<int> *>::iterator map_iter;
-	std::set<int>::iterator set_iter;
+	SHMTopicSubMap::iterator map_iter;
+	SHMKeySet::iterator set_iter;
 
 	if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
 		subscripter_set = map_iter->second;
 	} else {
-		subscripter_set = new std::set<int>;
+		void *set_ptr = mm_malloc(sizeof(SHMKeySet));
+		subscripter_set = new(set_ptr) SHMKeySet;
 		topic_sub_map->insert({topic, subscripter_set});
 	}
 	subscripter_set->insert(port);
@@ -155,11 +173,11 @@
  * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙�
 */
 void _proxy_pub(dgram_mod_socket_t * socket, char *topic, size_t head_len, void *buf, size_t size, int port) {
-	std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map;
-	std::set<int> *subscripter_set;
+	SHMTopicSubMap *topic_sub_map = socket->topic_sub_map;
+	SHMKeySet *subscripter_set;
 
-	std::map<std::string, std::set<int> *>::iterator map_iter;
-	std::set<int>::iterator set_iter;
+	SHMTopicSubMap::iterator map_iter;
+	SHMKeySet::iterator set_iter;
 
 	std::vector<int> subscripter_to_del;
 	std::vector<int>::iterator vector_iter;
@@ -171,12 +189,14 @@
 		subscripter_set = map_iter->second;
 		for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
 			send_port = *set_iter;
-// printf("run_accept_sub_request send before %d \n", send_port);
+ printf("_proxy_pub send before %d \n", send_port);
 			if (shm_sendto(socket->shm_socket, buf+head_len, size-head_len, send_port, &timeout) !=0 ) {
 				//瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡
 				subscripter_to_del.push_back(send_port);
+			} else {
+printf("_proxy_pub send after: %d \n", send_port);
 			}
-// printf("run_accept_sub_request send after: %d \n", send_port);
+
 			
 		}
 
@@ -200,9 +220,9 @@
 	size_t head_len;
 
 	const char *topic_delim = ",";
-//printf("server receive before\n");
+printf("run_pubsub_proxy server receive before\n");
 	while(shm_recvfrom(socket->shm_socket, (void **)&buf, &size, &port) == 0) {
-//printf("server recv after: %s \n", buf);
+printf("run_pubsub_proxy server recv after: %s \n", buf);
 		if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
 			if(strcmp(action, "sub") == 0) {
 				// 璁㈤槄鏀寔澶氫富棰樿闃�
diff --git a/src/socket/include/dgram_mod_socket.h b/src/socket/include/dgram_mod_socket.h
index 1c2ad64..1c12e53 100644
--- a/src/socket/include/dgram_mod_socket.h
+++ b/src/socket/include/dgram_mod_socket.h
@@ -25,6 +25,8 @@
 */
 int dgram_mod_bind(void * _socket, int port);
 
+
+int dgram_mod_force_bind(void * _socket, int port);
 /**
  * 鍙戦�佷俊鎭�
  * @port 鍙戦�佺粰璋�
diff --git a/src/socket/include/shm_socket.h b/src/socket/include/shm_socket.h
index 7025f9e..0ca4cfd 100644
--- a/src/socket/include/shm_socket.h
+++ b/src/socket/include/shm_socket.h
@@ -44,6 +44,7 @@
 	shm_socket_type_t socket_type;
 	// 鏈湴port
 	int port;
+	bool force_bind;
 	shm_connection_status_t status;
 	SHMQueue<shm_msg_t> *queue;
 	SHMQueue<shm_msg_t> *remoteQueue;
@@ -65,6 +66,9 @@
 
 int shm_socket_bind(shm_socket_t * socket, int port) ;
 
+int shm_socket_force_bind(shm_socket_t * socket, int port) ;
+
+
 int shm_listen(shm_socket_t * socket) ;
 
 shm_socket_t* shm_accept(shm_socket_t* socket);
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index 52251e6..168eb65 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -23,6 +23,7 @@
   shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
   socket->socket_type = socket_type;
   socket->port = -1;
+  socket->force_bind = false;
   socket->dispatch_thread = 0;
   socket->status = SHM_CONN_CLOSED;
 
@@ -46,6 +47,12 @@
   return 0;
 }
 
+int shm_socket_force_bind(shm_socket_t *socket, int port) {
+  socket->force_bind = true;
+  socket->port = port;
+  return 0;
+}
+
 int shm_listen(shm_socket_t *socket) {
 
   if (socket->socket_type != SHM_SOCKET_STREAM) {
@@ -60,7 +67,7 @@
     socket->port = port;
   } else {
 
-    if (hashtable_get(hashtable, socket->port) != NULL) {
+    if (hashtable_get(hashtable, socket->port) != NULL && !socket->force_bind) {
       err_exit(0, "key %d has already been in used!", socket->port);
     }
   }
@@ -144,7 +151,7 @@
     socket->port = hashtable_alloc_key(hashtable);
   } else {
 
-    if (hashtable_get(hashtable, socket->port) != NULL) {
+    if (hashtable_get(hashtable, socket->port) != NULL && !socket->force_bind ) {
       err_exit(0, "key %d has already been in used!", socket->port);
     }
   }
@@ -243,7 +250,8 @@
     } else {
 
       if (hashtable_get(hashtable, socket->port) != NULL) {
-        err_exit(0, "key %d has already been in used!", socket->port);
+        if(!socket->force_bind)
+          err_exit(0, "key %d has already been in used!", socket->port);
       }
     }
 
@@ -299,7 +307,8 @@
     } else {
 
       if (hashtable_get(hashtable, socket->port) != NULL) {
-        err_exit(0, "key %d has already been in used!", socket->port);
+        if(!socket->force_bind)
+          err_exit(0, "key %d has already been in used!", socket->port);
       }
     }
 
diff --git a/test_socket/dgram_mod_bus b/test_socket/dgram_mod_bus
index 1b3c3ef..82d76a1 100755
--- a/test_socket/dgram_mod_bus
+++ b/test_socket/dgram_mod_bus
Binary files differ
diff --git a/test_socket/dgram_mod_bus.c b/test_socket/dgram_mod_bus.c
index bddc7d5..3cb6d9f 100644
--- a/test_socket/dgram_mod_bus.c
+++ b/test_socket/dgram_mod_bus.c
@@ -1,13 +1,30 @@
 #include "dgram_mod_socket.h"
 #include "shm_mm.h"
 #include "usg_common.h"
+#include "mm.h"
 
-void server(int port) {
-  void *socket = dgram_mod_open_socket();
-  dgram_mod_bind(socket, port);
+void sigint_handler(int sig) {
+  printf("sigint_handler\n");
+  hashtable_t *hashtable = mm_get_hashtable();
+  //hashtable_remove(hashtable, 8);
+  // dgram_mod_close_socket(server_socket);
+  //SHMQueue<ELEM_T>::remove_queues_include
+  exit(0);
+}
+
+
+void server(int port, bool restart) {
+  //signal(SIGINT,  sigint_handler);
+  void * server_socket = dgram_mod_open_socket();
+
+  if(restart) {
+    dgram_mod_force_bind(server_socket, port);
+  } else {
+     dgram_mod_bind(server_socket, port);
+  }
+ 
    
-  dgram_mod_start_bus(socket);
-  
+  dgram_mod_start_bus(server_socket);
 }
 
 
@@ -68,14 +85,20 @@
   shm_init(512);
   int port;
   if (argc < 3) {
-    fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
+    fprintf(stderr, "Usage: %s %s|%s <PORT> ...\n", argv[0], "server", "client");
     return 1;
   }
 
   port = atoi(argv[2]);
 
   if (strcmp("server", argv[1]) == 0) {
-    server(port);
+    if(argc >= 4 && strcmp("restart", argv[3]) == 0) {
+      server(port, true);
+    }
+    else{
+      server(port, false);
+    }
+    
   }
 
   if (strcmp("client", argv[1]) == 0)
diff --git a/test_socket/dgram_mod_req_rep b/test_socket/dgram_mod_req_rep
index 5c2a703..bbe11f8 100755
--- a/test_socket/dgram_mod_req_rep
+++ b/test_socket/dgram_mod_req_rep
Binary files differ
diff --git a/test_socket/dgram_mod_survey b/test_socket/dgram_mod_survey
index f9dbccb..03db8ff 100755
--- a/test_socket/dgram_mod_survey
+++ b/test_socket/dgram_mod_survey
Binary files differ

--
Gitblit v1.8.0