From 607ac3ae8bfc017e10a7907e69dcbc3ab2a0fb63 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 05 二月 2021 13:54:56 +0800
Subject: [PATCH] add stop method

---
 test/UDPServer.cpp                       |   70 ++++++++++
 src/socket/bus_server_socket_wrapper.cpp |    4 
 test_net_socket/test_net_mod_socket.cpp  |   39 +++++
 src/socket/shm_socket.h                  |    6 
 src/bus_error.cpp                        |    3 
 test/UDPClient.cpp                       |   50 +++++++
 src/net/net_mod_socket.cpp               |    4 
 src/socket/shm_socket.cpp                |   37 +++-
 src/socket/shm_mod_socket.h              |    1 
 test_net_socket/test_bus_stop.cpp        |   11 +
 src/socket/bus_server_socket.cpp         |   46 +++---
 src/socket/bus_server_socket_wrapper.h   |    5 
 src/net/net_mod_socket_wrapper.cpp       |    5 
 src/net/net_mod_socket_wrapper.h         |    6 
 src/net/net_mod_socket.h                 |    2 
 src/psem.cpp                             |   42 +++---
 test_net_socket/net_mod_socket.sh        |    5 
 src/CMakeLists.txt                       |    3 
 src/bus_error.h                          |    1 
 src/socket/bus_server_socket.h           |    3 
 src/socket/shm_mod_socket.cpp            |    5 
 test/CMakeLists.txt                      |   15 ++
 22 files changed, 294 insertions(+), 69 deletions(-)

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index a96920e..18b555d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -29,7 +29,8 @@
 )
 
 
-add_library(shm_queue SHARED ${_SOURCES_})
+# add_library(shm_queue SHARED ${_SOURCES_})
+add_library(shm_queue STATIC ${_SOURCES_})
 
 target_include_directories(shm_queue PUBLIC ${EXTRA_INCLUDES} )
 
diff --git a/src/bus_error.cpp b/src/bus_error.cpp
index 2c410eb..a9348e0 100644
--- a/src/bus_error.cpp
+++ b/src/bus_error.cpp
@@ -18,7 +18,8 @@
   "Key already in use",
   "Network fault",
   "Send to self error",
-  "Receive from wrong end"
+  "Receive from wrong end",
+  "Service stoped"
 
 };
 
diff --git a/src/bus_error.h b/src/bus_error.h
index 7677545..fa9f352 100644
--- a/src/bus_error.h
+++ b/src/bus_error.h
@@ -11,6 +11,7 @@
 #define EBUS_NET 504
 #define EBUS_SENDTO_SELF 505
 #define EBUS_RECVFROM_WRONG_END 506
+#define EBUS_STOPED 507
 
 extern int bus_errno;
 
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index 8c1465d..ef817f0 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -49,6 +49,10 @@
 }
 
 
+int NetModSocket::stop() {
+  return shmModSocket.stop();
+}
+
 /**
  * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
diff --git a/src/net/net_mod_socket.h b/src/net/net_mod_socket.h
index 11e0ebe..9358311 100644
--- a/src/net/net_mod_socket.h
+++ b/src/net/net_mod_socket.h
@@ -98,7 +98,7 @@
   NetModSocket();
   ~NetModSocket();
 
-
+  int stop();
   /**
    * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓�
    * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp
index e72297a..c074e15 100644
--- a/src/net/net_mod_socket_wrapper.cpp
+++ b/src/net/net_mod_socket_wrapper.cpp
@@ -20,6 +20,11 @@
 	delete sockt;
 }
  
+int net_mod_socket_stop(void *_socket) {
+	NetModSocket *sockt = (NetModSocket *)_socket;
+	return sockt->stop();
+}
+
 /**
  * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
diff --git a/src/net/net_mod_socket_wrapper.h b/src/net/net_mod_socket_wrapper.h
index 79cb7d7..e29fe5b 100644
--- a/src/net/net_mod_socket_wrapper.h
+++ b/src/net/net_mod_socket_wrapper.h
@@ -29,12 +29,16 @@
  */
 void * net_mod_socket_open();
 
+
 /**
  * @brief 鍏抽棴 net_mod_socket
  */
 void net_mod_socket_close(void *_sockt);
 
-
+/**
+ * @brief 鍋滄 net_mod_socket
+ */
+int net_mod_socket_stop(void *_sockt);
 
 /**
  * @brief 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓�
diff --git a/src/psem.cpp b/src/psem.cpp
index fd06216..2ace11f 100644
--- a/src/psem.cpp
+++ b/src/psem.cpp
@@ -6,31 +6,31 @@
 
 int psem_timedwait(sem_t *sem, const struct timespec *ts) {
 	struct timespec abs_timeout = TimeUtil::calc_abs_time(ts);
-
-  int rv ;
-  while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) {
-      if(errno == EINTR)
-          continue;
-      else {
-         // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
-         return -1;
-      }
-  }
-  return 0;
+  return sem_timedwait(sem, &abs_timeout);
+  // int rv ;
+  // while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) {
+  //     if(errno == EINTR)
+  //         continue;
+  //     else {
+  //        // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
+  //        return -1;
+  //     }
+  // }
+  // return 0;
 }
 
 
 int psem_wait(sem_t *sem) {
-  int rv;
-  while ( (rv = sem_wait(sem)) == -1) {
-      if(errno == EINTR)
-          continue;
-      else {
-         // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
-         return -1;
-      }
-  }
-  return 0;
+  return sem_wait(sem);
+  // int rv;
+  // while ( (rv = sem_wait(sem)) == -1) {
+  //     if(errno == EINTR)
+  //         continue;
+  //     else {
+  //        return -1;
+  //     }
+  // }
+  // return 0;
 }
 
 int psem_trywait(sem_t *sem) {
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 3904297..b49eeb8 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -58,25 +58,7 @@
 }
 
 BusServerSocket::~BusServerSocket() {
-	SHMKeySet *subscripter_set;
-	SHMTopicSubMap::iterator map_iter;
-
-	stop();
-	 
-	if(topic_sub_map != NULL) {
-		for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
-			subscripter_set = map_iter->second;
-			if(subscripter_set != NULL) {
-				subscripter_set->clear();
-				mm_free((void *)subscripter_set);
-			}
-
-		}
-		topic_sub_map->clear();
-		mem_pool_free_by_key(SHM_BUS_MAP_KEY);
-	}
-	shm_close_socket(shm_socket);
-	logger->debug("BusServerSocket destory 3");
+	destroy();
 }
 
 
@@ -111,8 +93,6 @@
 	if( shm_socket->key <= 0) {
 		return -1;
 	}
-	// snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
-	// return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0);
 	bus_head_t head = {};
 	memcpy(head.action, "stop", sizeof(head.action));
 	head.topic_size = 0;
@@ -122,13 +102,33 @@
 	void *buf;
 	int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL,  0, &buf);
 	if(size > 0) {
-		ret = client.sendandrecv( buf, size, shm_socket->key, NULL, NULL);
+		ret = client.sendto( buf, size, shm_socket->key);
 		free(buf);
 		return ret;
 	} else {
 		return -1;
 	}
 
+}
+
+int  BusServerSocket::destroy() {
+	SHMKeySet *subscripter_set;
+	SHMTopicSubMap::iterator map_iter;
+	if(topic_sub_map != NULL) {
+		for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+			subscripter_set = map_iter->second;
+			if(subscripter_set != NULL) {
+				subscripter_set->clear();
+				mm_free((void *)subscripter_set);
+			}
+
+		}
+		topic_sub_map->clear();
+		mem_pool_free_by_key(SHM_BUS_MAP_KEY);
+	}
+	shm_close_socket(shm_socket);
+	logger->debug("BusServerSocket destory 3");
+	return 0;
 }
 
 /*
@@ -280,8 +280,6 @@
 		free(buf);
 	}
 
-
-	shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key);
 
 	return NULL;
 }
diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h
index 7a6c829..bdfb172 100644
--- a/src/socket/bus_server_socket.h
+++ b/src/socket/bus_server_socket.h
@@ -27,11 +27,12 @@
 	SHMTopicSubMap *topic_sub_map;
 
 private:
+	int  destroy();
 	void _proxy_sub( char *topic, int key);
 	void _proxy_pub( char *topic, void *buf, size_t size, int key);
 	void *_run_proxy_();
 	// int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
- 
+  	
 	void _proxy_desub( char *topic, int key);
 	void _proxy_desub_all(int key);
 
diff --git a/src/socket/bus_server_socket_wrapper.cpp b/src/socket/bus_server_socket_wrapper.cpp
index a4f148c..d2f5a8e 100644
--- a/src/socket/bus_server_socket_wrapper.cpp
+++ b/src/socket/bus_server_socket_wrapper.cpp
@@ -22,6 +22,10 @@
 	logger->debug("===bus_server_socket_wrapper_close\n");
 }
 
+int bus_server_socket_wrapper_stop(void *_socket) {
+	BusServerSocket *sockt = (BusServerSocket *)_socket;
+	return sockt->stop();
+}
 /**
  * 鍚姩bus
  * 
diff --git a/src/socket/bus_server_socket_wrapper.h b/src/socket/bus_server_socket_wrapper.h
index 2a7c59d..06a060e 100644
--- a/src/socket/bus_server_socket_wrapper.h
+++ b/src/socket/bus_server_socket_wrapper.h
@@ -29,6 +29,11 @@
 void bus_server_socket_wrapper_close(void *_sockt);
 
 /**
+ * @brief 鍋滄 bus_server_socket
+ */
+int bus_server_socket_wrapper_stop(void *_socket);
+
+/**
  * @brief 鍚姩bus
  * 
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index f5bd0b7..dc071cb 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -26,6 +26,11 @@
 	shm_close_socket(shm_socket);
 }
 
+int ShmModSocket::stop() {
+	return shm_socket_stop(shm_socket);
+}
+
+
 int ShmModSocket::bind(int key) {
 	return  shm_socket_bind(shm_socket, key);
 } 
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index fb389a3..44a714b 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -47,6 +47,7 @@
 	ShmModSocket();
 	~ShmModSocket();
 	 
+	int stop();
 	/**
 	 * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓�
 	 * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 8fef2f7..fc410bf 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -112,23 +112,31 @@
 
 int shm_close_socket(shm_socket_t *sockt) {
   
-  int s;
-  
+  int rv;
   logger->debug("shm_close_socket\n");
   if(sockt->queue != NULL) {
     delete sockt->queue;
     sockt->queue = NULL;
   }
 
-  s =  pthread_mutex_destroy(&(sockt->mutex) );
-  if(s != 0) {
-    err_exit(s, "shm_close_socket");
+  rv =  pthread_mutex_destroy(&(sockt->mutex) );
+  if(rv != 0) {
+    err_exit(rv, "shm_close_socket");
   }
 
   free(sockt);
   return 0;
 }
 
+
+int shm_socket_stop(shm_socket_t *sockt) {
+  struct timespec timeout = {5, 0};
+  shm_packet_t sendpak = {0};
+  sendpak.key = sockt->key;
+  sendpak.action = BUS_ACTION_STOP;
+  sendpak.size = 0;
+  return shm_sendpakto(sockt, &sendpak, sockt->key, &timeout, BUS_TIMEOUT_FLAG);
+}
 
 int shm_socket_bind(shm_socket_t *sockt, int key) {
   sockt->key = key;
@@ -175,6 +183,7 @@
   shm_packet_t sendpak;
   shm_packet_t recvpak;
   std::map<std::string, shm_packet_t>::iterator recvbufIter;
+
   std::string uuid = sole::uuid4().str();
   
   sendpak.key = sockt->key;
@@ -507,7 +516,7 @@
   
  
  LABEL_PUSH: 
-  if (key == sockt->key) {
+  if (sendpak->action != BUS_ACTION_STOP && key == sockt->key) {
     logger->error( "can not send to your self!");
     return EBUS_SENDTO_SELF;
   }
@@ -527,10 +536,11 @@
 }
 
 // 鐭繛鎺ユ柟寮忔帴鍙�
-static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,  int flag) {
+static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak ,  const struct timespec *timeout,  int flag) {
   int rv;
   
   hashtable_t *hashtable = mm_get_hashtable();
+  shm_packet_t recvpak;
 
   if( sockt->queue != NULL) 
     goto LABEL_POP;
@@ -557,11 +567,18 @@
   
 LABEL_POP:
 
+ // 
+  // printf("%p start recv.....\n", sockt);
  
-  printf("%p start recv.....\n", sockt);
- 
-  rv = sockt->queue->pop(*recvpak, timeout, flag);
+  rv = sockt->queue->pop(recvpak, timeout, flag);
+  if(rv != 0) 
+    return rv;
 
+  
+  if(recvpak.action == BUS_ACTION_STOP) {
+    return EBUS_STOPED;
+  }
+  *_recvpak = recvpak;
   return rv;
 }
 // int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf,
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index 03ee831..e7c0dda 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -14,13 +14,15 @@
 	
 };
  
- 
+
+#define BUS_ACTION_STOP 1 
 
 typedef struct shm_packet_t {
 	int key;
 	size_t size;
 	void * buf;
 	char uuid[64];
+	int action;
 
 } shm_packet_t;
 
@@ -50,6 +52,8 @@
 
 int shm_close_socket(shm_socket_t * socket) ;
 
+int shm_socket_stop(shm_socket_t *sockt);
+
 
 int shm_socket_bind(shm_socket_t * socket, int key) ;
 
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 0b001cc..31bfd83 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -49,4 +49,19 @@
 
 
 
+add_executable(UDPServer UDPServer.cpp )
+target_link_libraries(UDPServer PRIVATE  ${EXTRA_LIBS} )
+target_include_directories(UDPServer PRIVATE
+                            "${PROJECT_BINARY_DIR}"
+                             ${EXTRA_INCLUDES}
+                            )
+
+
+add_executable(UDPClient UDPClient.cpp )
+target_link_libraries(UDPClient PRIVATE  ${EXTRA_LIBS} )
+target_include_directories(UDPClient PRIVATE
+                            "${PROJECT_BINARY_DIR}"
+                             ${EXTRA_INCLUDES}
+                            )
+
 
diff --git a/test/UDPClient.cpp b/test/UDPClient.cpp
new file mode 100644
index 0000000..d54753b
--- /dev/null
+++ b/test/UDPClient.cpp
@@ -0,0 +1,50 @@
+// Client side implementation of UDP client-server model 
+#include <stdio.h> 
+#include <stdlib.h> 
+#include <unistd.h> 
+#include <string.h> 
+#include <sys/types.h> 
+#include <sys/socket.h> 
+#include <arpa/inet.h> 
+#include <netinet/in.h> 
+  
+#define PORT     8080 
+#define MAXLINE 1024 
+  
+// Driver code 
+int main() { 
+    int sockfd; 
+    char buffer[MAXLINE]; 
+    char *hello = "Hello from client"; 
+    struct sockaddr_in     servaddr; 
+  
+    // Creating socket file descriptor 
+    if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) { 
+        perror("socket creation failed"); 
+        exit(EXIT_FAILURE); 
+    } 
+  
+    memset(&servaddr, 0, sizeof(servaddr)); 
+      
+    // Filling server information 
+    servaddr.sin_family = AF_INET; 
+    servaddr.sin_port = htons(PORT); 
+    servaddr.sin_addr.s_addr = INADDR_ANY; 
+      
+    int n;
+    socklen_t len; 
+      
+    sendto(sockfd, (const char *)hello, strlen(hello), 
+        MSG_CONFIRM, (const struct sockaddr *) &servaddr,  
+            sizeof(servaddr)); 
+    printf("Hello message sent.\n"); 
+          
+    n = recvfrom(sockfd, (char *)buffer, MAXLINE,  
+                MSG_WAITALL, (struct sockaddr *) &servaddr, 
+                &len); 
+    buffer[n] = '\0'; 
+    printf("Server : %s\n", buffer); 
+  
+    close(sockfd); 
+    return 0; 
+} 
\ No newline at end of file
diff --git a/test/UDPServer.cpp b/test/UDPServer.cpp
new file mode 100644
index 0000000..4f998d7
--- /dev/null
+++ b/test/UDPServer.cpp
@@ -0,0 +1,70 @@
+// Server side implementation of UDP client-server model 
+#include <stdio.h> 
+#include <stdlib.h> 
+#include <unistd.h> 
+#include <string.h> 
+#include <sys/types.h> 
+#include <sys/socket.h> 
+#include <arpa/inet.h> 
+#include <netinet/in.h> 
+#include <signal.h>
+
+#define PORT	 8080 
+#define MAXLINE 1024 
+
+bool stop = false;
+
+static void stop_handler(int sig) {
+  printf("stop_handler\n");
+ 
+  stop = true;
+}
+// Driver code 
+int main() { 
+	int sockfd; 
+	char buffer[MAXLINE]; 
+	char *hello = "Hello from server"; 
+	struct sockaddr_in servaddr, cliaddr; 
+	signal(SIGINT,  stop_handler);
+	// Creating socket file descriptor 
+	if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) { 
+		perror("socket creation failed"); 
+		exit(EXIT_FAILURE); 
+	} 
+	
+	memset(&servaddr, 0, sizeof(servaddr)); 
+	memset(&cliaddr, 0, sizeof(cliaddr)); 
+	
+	// Filling server information 
+	servaddr.sin_family = AF_INET; // IPv4 
+	servaddr.sin_addr.s_addr = INADDR_ANY; 
+	servaddr.sin_port = htons(PORT); 
+	
+	// Bind the socket with the server address 
+	if ( bind(sockfd, (const struct sockaddr *)&servaddr, 
+			sizeof(servaddr)) < 0 ) 
+	{ 
+		perror("bind failed"); 
+		exit(EXIT_FAILURE); 
+	} 
+	
+	int n; 
+
+	socklen_t len = sizeof(cliaddr); //len is value/resuslt 
+	while(!stop) {
+		n = recvfrom(sockfd, (char *)buffer, MAXLINE, 
+				MSG_WAITALL, ( struct sockaddr *) &cliaddr, 
+				&len); 
+		buffer[n] = '\0'; 
+		printf("Client : %s\n", buffer); 
+		sendto(sockfd, (const char *)hello, strlen(hello), 
+			MSG_CONFIRM, (const struct sockaddr *) &cliaddr, 
+				len); 
+		printf("Hello message sent.\n"); 
+	}
+
+	printf("===stopted.\n");
+
+	
+	return 0; 
+} 
diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh
index 52eb54e..288f192 100755
--- a/test_net_socket/net_mod_socket.sh
+++ b/test_net_socket/net_mod_socket.sh
@@ -53,6 +53,11 @@
 	 
 }
 
+function stop() {
+	ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk  '{print $2}' | xargs -i kill -15 {}
+	
+}
+
 function close() {
 	ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk  '{print $2}' | xargs -i kill -9 {}
 	ipcrm -a
diff --git a/test_net_socket/test_bus_stop.cpp b/test_net_socket/test_bus_stop.cpp
index 3185029..d40f0d7 100644
--- a/test_net_socket/test_bus_stop.cpp
+++ b/test_net_socket/test_bus_stop.cpp
@@ -8,8 +8,8 @@
 
 static void * server_sockt;
 
-static void sigint_handler(int sig) {
-  bus_server_socket_wrapper_close(server_sockt);
+static void stop_bus_handler(int sig) {
+  bus_server_socket_wrapper_stop(server_sockt);
 }
 
 static void *_start_bus_(void *arg) {
@@ -20,14 +20,17 @@
   if(bus_server_socket_wrapper_start_bus(server_sockt) != 0) {
     printf("start bus failed\n");
   }
-  printf("============_start_bus_ end\n" );
+
+  bus_server_socket_wrapper_close(server_sockt);
+  printf("============bus stopted\n" );
 }
 
 
 int main() {
  pthread_t tid;
  char action[512];
- signal(SIGINT,  sigint_handler);
+ signal(SIGINT,  stop_bus_handler);
+ signal(SIGTERM,  stop_bus_handler);
  shm_mm_wrapper_init(512);
  server_sockt = bus_server_socket_wrapper_open();
  pthread_create(&tid, NULL, _start_bus_,  NULL);
diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp
index 2ebbd06..49b6251 100644
--- a/test_net_socket/test_net_mod_socket.cpp
+++ b/test_net_socket/test_net_mod_socket.cpp
@@ -10,6 +10,8 @@
 
 #define  SCALE  100000
 
+static Logger *logger = LoggerFactory::getLogger();
+
 typedef struct Targ {
   net_node_t *node;
 	char *nodelist;
@@ -133,6 +135,7 @@
 
 void *serverSockt;
 
+
 static void _recvandsend_callback_(void *recvbuf, int recvsize, int key, void **sendbuf_ptr, int *sendsize_ptr, void * user_data) {
   char sendbuf[512];
   printf( "server: RECEIVED REQUEST FROM  %d : %s\n", key, (char *)recvbuf);
@@ -145,16 +148,44 @@
   return;
 } 
 
+bool stop = false;
+
+static void stop_replyserver_handler(int sig) {
+  printf("stop_handler\n");
+ 
+  int rv = net_mod_socket_stop(serverSockt);
+  if(rv ==0) {
+    logger->debug("send stop suc");
+    return;
+  } else {
+    logger->debug("send stop fail.%s\n", bus_strerror(rv));
+  }
+}
 
 void start_reply(int mkey) {
-  printf("start reply\n");
+  logger->debug("start reply\n");
+  signal(SIGINT,  stop_replyserver_handler);
+  signal(SIGTERM,  stop_replyserver_handler);
+
   serverSockt = net_mod_socket_open();
   net_mod_socket_bind(serverSockt, mkey);
  
-  int rv;
-  while(true) {
-    rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL );
+  int rv = 0 ;
+  while(  true) {
+    rv = net_mod_socket_recvandsend(serverSockt, _recvandsend_callback_ , NULL );
+    if (rv == 0)
+      continue;
+    if(rv == EBUS_STOPED) {
+      logger->debug("Stopping\n");
+      break;
+    }
+    logger->debug("net_mod_socket_recvandsend error.%s\n", bus_strerror(rv));
+
   }
+  
+    //rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL );
+  net_mod_socket_close(serverSockt);
+  logger->debug("stopted\n");
   // while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &key) ) == 0) {
   //  // printf( "server: RECEIVED REQUEST FROM  %d NAME %s\n", key, recvbuf);
   //   sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf);

--
Gitblit v1.8.0