From 1b94589dcb8d497d2d8a208efd61a54631f6b84e Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期三, 23 十二月 2020 16:08:33 +0800
Subject: [PATCH] update

---
 src/socket/net_mod_socket.c            |   60 +++++++++--
 src/socket/shm_socket.c                |   45 ++++++---
 test_net_socket/test_net_mod_socket.c  |    6 
 src/queue/lock_free_queue.h            |   22 ++-
 src/socket/bus_server_socket.c         |   36 +++---
 src/socket/bus_server_socket_wrapper.c |    9 +
 test_net_socket/test_bus_stop.c        |   54 ++++++++++
 test_net_socket/Makefile               |    3 
 src/socket/net_mod_server_socket.c     |    2 
 test_net_socket/heart_beat.c           |   41 +++++--
 10 files changed, 202 insertions(+), 76 deletions(-)

diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index ee11da6..84c885c 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -11,6 +11,7 @@
 // default Queue size
 #define LOCK_FREE_Q_DEFAULT_SIZE 16
 
+// static Logger *logger = LoggerFactory::getLogger();
 // define this macro if calls to "size" must return the real size of the 
 // queue. If it is undefined  that function will try to take a snapshot of 
 // the queue, but returned value might be bogus
@@ -200,7 +201,7 @@
     template <typename T, typename AT> class Q_TYPE>
 bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
 {
- // printf("==================LockFreeQueue push before\n");   
+LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");   
     if (SemUtil::dec(slots) == -1) {
         err_msg(errno, "LockFreeQueue push");
         return false;
@@ -209,7 +210,7 @@
     if ( m_qImpl.push(a_data) ) {
 
         SemUtil::inc(items);   
- // printf("==================LockFreeQueue push after\n");   
+LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");   
         return true;
     }
     return false;
@@ -247,18 +248,19 @@
 bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout)
 {
 
-
+LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");
     if (SemUtil::dec_timeout(slots, timeout) == -1) {
         if (errno == EAGAIN)
             return false;
         else {
-            // err_msg(errno, "LockFreeQueue push_timeout");
+            err_msg(errno, "LockFreeQueue push_timeout");
             return false;
         }
     }
 
     if (m_qImpl.push(a_data)){
-        SemUtil::inc(items);       
+        SemUtil::inc(items);   
+LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");    
         return true;
     }
     return false;
@@ -274,7 +276,8 @@
     template <typename T, typename AT> class Q_TYPE>
 bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
 {
- // printf("==================LockFreeQueue pop before\n");
+
+LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
     if (SemUtil::dec(items) == -1) {
         err_msg(errno, "LockFreeQueue pop");
         return false;
@@ -282,7 +285,7 @@
 
     if (m_qImpl.pop(a_data)) {
         SemUtil::inc(slots);
- // printf("==================LockFreeQueue pop after\n");      
+LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");      
         return true;
     }
     return false;
@@ -319,7 +322,7 @@
     template <typename T, typename AT> class Q_TYPE>
 bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
 {
-// printf("==================LockFreeQueue pop_timeout before\n");
+LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");
     if (SemUtil::dec_timeout(items, timeout) == -1) {
         if (errno == EAGAIN)
             return false;
@@ -331,7 +334,7 @@
 
     if (m_qImpl.pop(a_data)) {
         SemUtil::inc(slots);  
-// printf("==================LockFreeQueue pop_timeout after\n");     
+LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");     
         return true;
     }
     return false;
@@ -346,6 +349,7 @@
     return m_qImpl.operator[](i);
 }
 
+
 template <
     typename ELEM_T, 
     typename Allocator,
diff --git a/src/socket/bus_server_socket.c b/src/socket/bus_server_socket.c
index 8c50d37..0f4e52e 100644
--- a/src/socket/bus_server_socket.c
+++ b/src/socket/bus_server_socket.c
@@ -60,15 +60,19 @@
 
 
 BusServerSocket::BusServerSocket() {
+	logger->debug("BusServerSocket Init");
 	shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
 	topic_sub_map = NULL;
+
 }
 
 BusServerSocket::~BusServerSocket() {
 	SHMKeySet *subscripter_set;
 	SHMTopicSubMap::iterator map_iter;
 
+	logger->debug("BusServerSocket destory 1");
 	stop();
+	logger->debug("BusServerSocket destory 2");
 	 
 	if(topic_sub_map != NULL) {
 		for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
@@ -83,6 +87,7 @@
 		mem_pool_free_by_key(BUS_MAP_KEY);
 	}
 	shm_close_socket(shm_socket);
+	logger->debug("BusServerSocket destory 3");
 }
 
 
@@ -109,14 +114,13 @@
  
 	run_pubsub_proxy();
 	// 杩涚▼鍋滄鐨勬椂鍊欙紝棰勭暀3绉掕祫婧愬洖鏀剁殑鏃堕棿銆傚惁鍒欙紝浼氬彂鐢熻皟鐢╟lose鐨勬椂鍊欙紝鍏变韩鍐呭瓨鐨勮祫婧愯繕娌℃潵寰楀強鍥炴敹杩涚▼灏遍��鍑轰簡
-	sleep(3);
 	return 0;
 }
 
 
 int  BusServerSocket::stop(){
 	int ret;
-	 
+	logger->debug("====>stopping");
 	if( shm_socket->key <= 0) {
 		return -1;
 	}
@@ -127,15 +131,11 @@
 	head.topic_size = 0;
 	head.content_size = 0;
 
-	void *recv_buf;
-	int recv_size;
-
 	void *buf;
 	int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL,  0, &buf);
 	if(size > 0) {
-		ret = shm_sendandrecv(shm_socket, buf, size, shm_socket->key, &recv_buf, &recv_size);
+		ret = shm_sendandrecv_unsafe(shm_socket, buf, size, shm_socket->key, NULL, NULL);
 		free(buf);
-		free(recv_buf);
 		return ret;
 	} else {
 		return -1;
@@ -260,7 +260,8 @@
         topic =  strtok(NULL, topic_delim);
 		  }
 
-		} else if(strcmp(action, "desub") == 0) {
+		} 
+		else if(strcmp(action, "desub") == 0) {
 // printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
 			if(strcmp(trim(topics, 0), "") == 0) {
 				// 鍙栨秷鎵�鏈夎闃�
@@ -274,27 +275,26 @@
 			  }
 			}
 			
-		} else if(strcmp(action, "pub") == 0) {
+		} 
+		else if(strcmp(action, "pub") == 0) {
 			 content = topics + head.topic_size;
 			_proxy_pub(topics, content, head.content_size, key);
-		}  else if(strcmp(action, "stop") == 0) {
-			logger->info( "Stopping Bus...");
-			 // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
-			shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key);
 
+		}  
+		else if(strcmp(action, "stop") == 0) {
+			
 			free(buf);
 			break;
 		} else {
 			logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action %s", action);
 		}
 		
-		// free(action);
-		// free(topics);
-		// } else {
-		// 	logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
-		// }
 		free(buf);
 	}
+
+	logger->info( "Stopping Bus...");
+	shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key);
+
 	return NULL;
 }
 
diff --git a/src/socket/bus_server_socket_wrapper.c b/src/socket/bus_server_socket_wrapper.c
index 5c793c1..220461e 100644
--- a/src/socket/bus_server_socket_wrapper.c
+++ b/src/socket/bus_server_socket_wrapper.c
@@ -7,7 +7,7 @@
  * 鍒涘缓
  */
 void * bus_server_socket_wrapper_open() {
-	printf("===bus_server_socket_wrapper_open\n");
+	logger->debug("===bus_server_socket_wrapper_open\n");
 	BusServerSocket *sockt = new BusServerSocket;
 	return (void *)sockt;
 }
@@ -16,9 +16,10 @@
  * 鍏抽棴
  */
 void bus_server_socket_wrapper_close(void *_socket) {
-	printf("===bus_server_socket_wrapper_close\n");
-	BusServerSocket *sockt = (BusServerSocket *)_socket;
-	delete sockt;
+
+	// BusServerSocket *sockt = (BusServerSocket *)_socket;
+	//delete sockt;
+	logger->debug("===bus_server_socket_wrapper_close\n");
 }
 
 /**
diff --git a/src/socket/net_mod_server_socket.c b/src/socket/net_mod_server_socket.c
index 2fb70e8..46c3bce 100644
--- a/src/socket/net_mod_server_socket.c
+++ b/src/socket/net_mod_server_socket.c
@@ -168,9 +168,7 @@
     if(request_head.timeout > 0) {
       timeout.tv_sec = request_head.timeout / 1000;
       timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6;
-
       // printf(" timeout.tv_sec = %d,  timeout.tv_nsec=%ld\n",  timeout.tv_sec,  timeout.tv_nsec );
-
       ret = shmModSocket.sendandrecv_unsafe_timeout(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, &timeout);
     }
     else if(request_head.timeout == 0) {
diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c
index 48b0e7c..fb5003e 100644
--- a/src/socket/net_mod_socket.c
+++ b/src/socket/net_mod_socket.c
@@ -86,15 +86,15 @@
   int i, n, recv_size, connfd;
   net_node_t *node;
   void *recv_buf = NULL;
+  struct timespec timeout;
+  int ret;
+  int n_req = 0, n_recv_suc = 0, n_resp =0;
   
   net_mod_request_head_t request_head = {};
- 
-  int n_req = 0, n_recv_suc = 0, n_resp =0;
-
    
   net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
 
-  int ret;
+ 
   NetConnPool *mpool;
 
   /* Make first caller allocate key for thread-specific data */
@@ -131,7 +131,17 @@
     node = &node_arr[i];
     if(node->host == NULL || strcmp(node->host, "") == 0 ) {
       // 鏈湴鍙戦��
-      if(shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size) == 0) {
+     
+      if(msec == 0) {
+        ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size);
+      } else if(msec > 0){
+        timeout.tv_sec = msec / 1000;
+        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
+        ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout);
+      } else {
+        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
+      }
+      if( ret == 0) {
         strcpy( ret_arr[n_recv_suc].host,"");
         ret_arr[n_recv_suc].port = 0;
         ret_arr[n_recv_suc].key = node->key;
@@ -229,7 +239,12 @@
 
   mpool->maxi = -1;
 
-  *recv_arr = ret_arr;
+  if(recv_arr != NULL) {
+    *recv_arr = ret_arr;
+  } else {
+    free_recv_msg_arr(ret_arr, n_recv_suc);
+  }
+  
   if(recv_arr_size != NULL) {
     *recv_arr_size = n_recv_suc;
   }
@@ -264,9 +279,10 @@
 // int  pub(char *topic, int topic_size, void *content, int content_size, int port);
 
 int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content,
- int content_size, int  timeout) {
+ int content_size, int  msec) {
   int i, connfd;
   net_node_t *node;
+  struct timespec timeout;
  
   net_mod_request_head_t request_head;
   net_mod_recv_msg_t recv_msg;
@@ -302,7 +318,16 @@
 
   // 鏈湴鍙戦��
   if(node_arr == NULL || arrlen == 0) {
-    if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) {
+    if(msec == 0) {
+      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
+    } else if(msec > 0) {
+      timeout.tv_sec = msec / 1000;
+      timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
+      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
+    } else {
+      ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
+    }
+    if(ret == 0 ) {
       n_pub_suc++;
     }
   }
@@ -312,9 +337,20 @@
     node = &node_arr[i];
     if(node->host == NULL) {
       // 鏈湴鍙戦��
-      if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) {
-         n_pub_suc++;
+      if(msec == 0) {
+        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
+      } else if(msec > 0) {
+        timeout.tv_sec = msec / 1000;
+        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
+        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
+      } else {
+        ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
       }
+
+      if(ret == 0 ) {
+        n_pub_suc++;
+      }
+      
      
     } else {
       sprintf(portstr, "%d", node->port);
@@ -326,7 +362,7 @@
       request_head.key = node->key;
       request_head.content_length = content_size;
       request_head.topic_length = strlen(topic) + 1;
-      request_head.timeout = timeout;
+      request_head.timeout = msec;
 
       if(write_request(connfd, request_head, content, content_size, topic, request_head.topic_length) != 0) {
         LoggerFactory::getLogger()->error(" NetModSocket::_pub_ write_request failture %s:%d\n", node->host, node->port);
@@ -341,7 +377,7 @@
   while(n_resp < n_req)
   {
     /* Wait for listening/connected descriptor(s) to become ready */
-    if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, timeout) ) <= 0) {
+    if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, msec) ) <= 0) {
        // wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂�
       break;
     }
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index efb3ef7..c1ac3c8 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -45,6 +45,7 @@
 
 shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
 
+  logger->debug("shm_open_socket\n");
   shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
   socket->socket_type = socket_type;
   socket->key = -1;
@@ -52,11 +53,11 @@
   socket->dispatch_thread = 0;
   socket->status = SHM_CONN_CLOSED;
   socket->mutex = SemUtil::get(IPC_PRIVATE, 1);
-  logger->debug("shm_open_socket\n");
+  
   return socket;
 }
 
-static int _shm_close_socket(shm_socket_t *socket) {
+int shm_close_socket(shm_socket_t *socket) {
   
   int ret;
 
@@ -76,12 +77,12 @@
   return ret;
 }
 
-int shm_close_socket(shm_socket_t *socket) {
+// int shm_close_socket(shm_socket_t *socket) {
   
-  // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_));
+//   // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_));
  
-  return _shm_close_socket(socket);;
-}
+//   return shm_close_socket(socket);;
+// }
 
 int shm_socket_bind(shm_socket_t *socket, int key) {
   socket->key = key;
@@ -391,11 +392,18 @@
   }
 
   if (rv) {
-    void *_buf = malloc(src.size);
-    memcpy(_buf, src.buf, src.size);
-    *buf = _buf;
-    *size = src.size;
-    *key = src.key;
+    if(buf != NULL) {
+      void *_buf = malloc(src.size);
+      memcpy(_buf, src.buf, src.size);
+      *buf = _buf; 
+    }
+   
+    if(size != NULL)
+      *size = src.size;
+
+    if(key != NULL)
+      *key = src.key;
+
     mm_free(src.buf);
     // printf("shm_recvfrom pop after\n");
     return 0;
@@ -411,12 +419,13 @@
   int rv;
   if(tmp_socket == NULL)
     return;
+
   logger->debug("%d destroy tmp socket\n", pthread_self()); 
-  _shm_close_socket((shm_socket_t *)tmp_socket);
+  shm_close_socket((shm_socket_t *)tmp_socket);
   rv =  pthread_setspecific(_tmp_recv_socket_key_, NULL);
   if ( rv != 0) {
-      logger->error(rv, "shm_sendandrecv : pthread_setspecific");
-      exit(1);
+    logger->error(rv, "shm_sendandrecv : pthread_setspecific");
+    exit(1);
   }
 }
 
@@ -438,7 +447,7 @@
 
 
 
-int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
+int shm_sendandrecv_safe(shm_socket_t *socket, const void *send_buf,
                     const int send_size, const int send_key, void **recv_buf,
                     int *recv_size,  struct timespec *timeout,  int flags) {
   int recv_key;
@@ -508,6 +517,12 @@
   return -1;
 }
 
+int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
+                    const int send_size, const int send_key, void **recv_buf,
+                    int *recv_size,  struct timespec *timeout,  int flags) {
+  return  shm_sendandrecv_unsafe(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout,  flags);
+}
+
 // ============================================================================================================
 
 /**
diff --git a/test_net_socket/Makefile b/test_net_socket/Makefile
index 0d8505f..832a130 100644
--- a/test_net_socket/Makefile
+++ b/test_net_socket/Makefile
@@ -14,8 +14,7 @@
 #-I$(ROOT)/include/usgcommon
 INCLUDES += -I${ROOT}/src -I${ROOT}/src/queue -I${ROOT}/src/socket -I${ROOT}/src/common/include -I${ROOT}/include/usgcommon
 
-
-PROGS = ${DEST}/test_net_mod_socket
+PROGS = ${DEST}/test_net_mod_socket ${DEST}/test_bus_stop ${DEST}/heart_beat
 
 DEPENDENCES = $(patsubst %, %.d, $(PROGS)) 
 
diff --git a/test_socket/dgram_mod_survey.c b/test_net_socket/heart_beat.c
similarity index 69%
rename from test_socket/dgram_mod_survey.c
rename to test_net_socket/heart_beat.c
index da3260f..562cb23 100644
--- a/test_socket/dgram_mod_survey.c
+++ b/test_net_socket/heart_beat.c
@@ -1,6 +1,10 @@
-#include "dgram_mod_socket.h"
+#include "net_mod_server_socket_wrapper.h"
+#include "net_mod_socket_wrapper.h"
+#include "bus_server_socket_wrapper.h"
+
 #include "shm_mm_wraper.h"
 #include "usg_common.h"
+#include <getopt.h>
 
 
 typedef struct Targ {
@@ -10,43 +14,50 @@
 }Targ;
 
 void sigint_handler(int sig) {
-   //dgram_mod_close_socket(server_socket);
+   // net_mod_socket_close(server_socket);
   printf("===Catch sigint======================\n");
   shm_mm_wrapper_destroy();
   exit(0);
 }
 
 void server(int port) {
-  void *socket = dgram_mod_open_socket();
-  dgram_mod_bind(socket, port);
+  void *serv = net_mod_socket_open();
+  net_mod_socket_bind(serv, port);
   int size;
   void *recvbuf;
   char sendbuf[512];
   int rv;
   int remote_port;
   while (true) {
-    if ((rv = dgram_mod_recvfrom_timeout(socket, &recvbuf, &size, &remote_port, 15, 0) ) == 0) {
+    if ((rv =  net_mod_socket_recvfrom(serv, &recvbuf, &size, &remote_port) ) == 0) {
       printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
+      net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port);
       free(recvbuf);
     }
     
   }
-  dgram_mod_close_socket(socket);
+  net_mod_socket_close(serv);
 }
 
 void client(int port) {
-  void *socket = dgram_mod_open_socket();
+  int rv;
+  void *client = net_mod_socket_open();
   int size;
   char sendbuf[512];
   long i = 0;
+  net_node_t node_arr[] = {"", 0, 100};
+  int node_arr_size = 1;
+
+  int recv_arr_size;
+  net_mod_recv_msg_t *recv_arr;
   while (true) {
     sprintf(sendbuf, "%d", i);
     printf("SEND HEART:%s\n", sendbuf);
-    dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
+    rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
    // sleep(1);
     i++;
   }
-  dgram_mod_close_socket(socket);
+   net_mod_socket_close(client);
 }
 
 
@@ -54,20 +65,26 @@
   signal(SIGINT,  sigint_handler);
   Targ *targ = (Targ *)arg;
   int port = targ->port;
-  void *socket = dgram_mod_open_socket();
+  void *socket = net_mod_socket_open();
   int size;
   char sendbuf[512];
   long scale = 10;
   long i = 0;
+  net_node_t node_arr[] = {"", 0, 100};
+  int node_arr_size = 1;
+
+  int recv_arr_size;
+  net_mod_recv_msg_t *recv_arr;
+
   while (i < scale) {
     sprintf(sendbuf, "%d", i);
     printf("%d SEND HEART:%s\n", targ->id, sendbuf);
-    dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
+    net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
     sleep(1);
     i++;
   }
   
-  dgram_mod_close_socket(socket);
+   net_mod_socket_close(socket);
   return (void *)i;
 }
 
diff --git a/test_net_socket/test_bus_stop.c b/test_net_socket/test_bus_stop.c
new file mode 100644
index 0000000..ed2f60f
--- /dev/null
+++ b/test_net_socket/test_bus_stop.c
@@ -0,0 +1,54 @@
+#include "net_mod_server_socket_wrapper.h"
+#include "net_mod_socket_wrapper.h"
+#include "bus_server_socket_wrapper.h"
+
+#include "shm_mm_wraper.h"
+#include "usg_common.h"
+#include <getopt.h>
+
+static void * server_sockt;
+
+static void *_start_bus_(void *arg) {
+ // pthread_detach(pthread_self());
+	printf("Start bus server\n");
+  pthread_t tid;
+
+  server_sockt = bus_server_socket_wrapper_open();
+
+  if(bus_server_socket_wrapper_start_bus(server_sockt) != 0) {
+    printf("start bus failed\n");
+  }
+}
+
+int main() {
+
+
+ pthread_t tid;
+ char action[512];
+
+ shm_mm_wrapper_init(512);
+ pthread_create(&tid, NULL, _start_bus_,  NULL);
+
+
+ while (true) {
+    printf("Input action: Close?\n");
+    if(scanf("%s", action) < 1) {
+      printf("Invalide action\n");
+      continue;
+    }
+
+    if(strcmp(action, "close") == 0) {
+      bus_server_socket_wrapper_close(server_sockt);
+      break;
+    } else {
+      printf("Invalide action\n");
+    }
+ }
+
+ if (pthread_join(tid, NULL) != 0) {
+    perror(" pthread_join");
+ }
+
+
+ shm_mm_wrapper_destroy();
+}
\ No newline at end of file
diff --git a/test_net_socket/test_net_mod_socket.c b/test_net_socket/test_net_mod_socket.c
index f509e2c..5693cf6 100644
--- a/test_net_socket/test_net_mod_socket.c
+++ b/test_net_socket/test_net_mod_socket.c
@@ -137,6 +137,7 @@
     sprintf(sendbuf, "RECEIVED  PORT %d NAME %s", remote_port, recvbuf);
     net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port);
     free(recvbuf);
+    sleep(1000);
   }
 }
 
@@ -259,8 +260,9 @@
   for (i = 0; i < SCALE; i++) {
     sprintf(sendbuf, "thread(%d) %d", targ->id, i);
     fprintf(fp, "requst:%s\n", sendbuf);
-    n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
-    //printf("send %d nodes\n", n);
+    // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
+     n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000);
+    printf("send %d nodes\n", n);
     for(j=0; j < recv_arr_size; j++) {
     	fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n", 
     		recv_arr[j].host,

--
Gitblit v1.8.0