From 5410446ade40493d17f7e2d7f0d687b0998acc6a Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期三, 27 一月 2021 11:56:54 +0800
Subject: [PATCH] timeout wait 合一

---
 src/socket/shm_mod_socket.h             |   52 ++---
 src/net/net_mod_socket_wrapper.h        |    1 
 test_net_socket/test_net_mod_socket.cpp |   93 +++++++++-
 test_net_socket/net_mod_socket.sh       |   22 ++
 src/socket/shm_mod_socket.cpp           |  190 ++++++--------------
 src/net/net_mod_server_socket.cpp       |    8 
 src/net/net_mod_socket.cpp              |  120 ++----------
 7 files changed, 210 insertions(+), 276 deletions(-)

diff --git a/src/net/net_mod_server_socket.cpp b/src/net/net_mod_server_socket.cpp
index 432defb..ede0e67 100644
--- a/src/net/net_mod_server_socket.cpp
+++ b/src/net/net_mod_server_socket.cpp
@@ -171,10 +171,10 @@
       timeout.tv_sec = request_head.timeout / 1000;
       timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6;
       // printf(" timeout.tv_sec = %d,  timeout.tv_nsec=%ld\n",  timeout.tv_sec,  timeout.tv_nsec );
-      ret = shmModSocket.sendandrecv_unsafe_timeout(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, &timeout);
+      ret = shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG);
     }
     else if(request_head.timeout == 0) {
-      ret = shmModSocket.sendandrecv_unsafe_nowait(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size);
+      ret = shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG);
     }
     else if(request_head.timeout == -1) {
       ret = shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size);
@@ -236,10 +236,10 @@
     if(request_head.timeout > 0) {
       timeout.tv_sec = request_head.timeout / 1000;
       timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6;
-      ret = shmModSocket.pub_timeout((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY, &timeout);
+      ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
     }
     else if(request_head.timeout == 0) {
-      ret = shmModSocket.pub_nowait((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY);
+      ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG);
     }
     else if(request_head.timeout == -1) {
       ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY);
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index baeef65..3668b14 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -17,7 +17,7 @@
 {
   int s;
   if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
-      logger->error(errno, "NetModSocket::NetModSocket signal");
+    logger->error(errno, "NetModSocket::NetModSocket signal");
 
   gpool = new NetConnPool();
 
@@ -165,11 +165,11 @@
       // 鏈湴鍙戦��
      
       if(msec == 0) {
-        ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size);
+        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG);
       } else if(msec > 0){
         timeout.tv_sec = msec / 1000;
         timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
-        ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout);
+        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG);
       } else {
         ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
       }
@@ -334,11 +334,11 @@
   // 鏈湴鍙戦��
   if(node_arr == NULL || arrlen == 0) {
     if(msec == 0) {
-      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
+      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY,  NULL, BUS_NOWAIT_FLAG);
     } else if(msec > 0) {
       timeout.tv_sec = msec / 1000;
       timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
-      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
+      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
     } else {
       ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
     }
@@ -354,11 +354,11 @@
     if(node->host == NULL) {
       // 鏈湴鍙戦��
       if(msec == 0) {
-        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
+        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG);
       } else if(msec > 0) {
         timeout.tv_sec = msec / 1000;
         timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
-        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
+        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
       } else {
         ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
       }
@@ -457,55 +457,20 @@
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
  */
 int NetModSocket::sendto(const void *buf, const int size, const int key){
-  int rv = shmModSocket.sendto(buf, size, key);
-  if(rv == 0) {
-    logger->debug("NetModSocket::sendto: %d sendto %d success.\n", get_key(), key);
-    return 0;
-  }
-
-  if(rv > EBUS_BASE) {
-    // bus_errno = EBUS_TIMEOUT;
-    logger->debug("NetModSocket::sendto: %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
-  } else {
-    logger->error(rv, "NetModSocket::sendto : %d sendto  %d failed", get_key(), key);
-  }
-  return rv;
+  return shmModSocket.sendto(buf, size, key);
 }
 
 // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
 int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){
   struct timespec timeout = {sec, nsec};
-  int rv = shmModSocket.sendto_timeout(buf, size, key, &timeout);
-  if(rv == 0) {
-    logger->debug("NetModSocket::sendto_timeout: %d sendto %d success.\n", get_key(), key);
-    return 0;
-  }
-
-  if(rv > EBUS_BASE) {
-    // bus_errno = EBUS_TIMEOUT;
-    logger->debug("NetModSocket::sendto_timeout : %d sendto  %d failed %s", get_key(),  key, bus_strerror(rv));
-  } else {
-    logger->error(rv, "NetModSocket::sendto_timeout:  %d sendto  %d failed", get_key(),  key);
-  }
-  return rv;
+  return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
+   
 }
 
 // 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
 int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){
-  int rv = shmModSocket.sendto_nowait(buf, size, key);
-  if(rv == 0) {
-    logger->debug("NetModSocket::sendto_nowait: %d sendto %d success.\n", get_key(), key);
-    return 0;
-  }
-
-  if(rv > EBUS_BASE) {
-    // bus_errno = EBUS_TIMEOUT;
-    logger->debug("NetModSocket::sendto_nowait %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
-     
-  } else {
-    logger->error(rv, "NetModSocket::sendto_nowait %d sendto  %d failed", get_key(), key);
-  }
-  return rv;
+  return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG);
+  
 }
 
 /**
@@ -515,54 +480,19 @@
 */
 int NetModSocket::recvfrom(void **buf, int *size, int *key) {
 
-  logger->debug(" %d NetModSocket::recvfrom before", get_key());
-  int rv = shmModSocket.recvfrom(buf, size, key);
-
-  if(rv == 0) {
-    logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key);
-    return 0;
-  }
-
-  if(rv > EBUS_BASE) {
-    logger->debug("NetModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
-  } else {
-    logger->error(rv, "NetModSocket::recvfrom: socket %d recvfrom failed",  get_key());
-  }
-  return rv;
+  return shmModSocket.recvfrom(buf, size, key);
+ 
 }
 
 // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
 int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){
   struct timespec timeout = {sec, nsec};
-  int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
-  if(rv == 0) {
-    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom %d success.\n", get_key(), *key);
-    return 0;
-  }
-
-  if(rv > EBUS_BASE) {
-    // bus_errno = EBUS_TIMEOUT;
-    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
-  } else {
-    logger->error(rv, "NetModSocket::recvfrom_timeout:  %d recvfrom failed",  get_key());
-  }
-  return rv;
+  return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
+  
 }
 
 int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
-  int rv = shmModSocket.recvfrom_nowait(buf, size, key);
-  if(rv == 0) {
-    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom %d success.\n", get_key(), *key);
-    return 0;
-  }
-
-  if(rv > EBUS_BASE) {
-    // bus_errno = EBUS_TIMEOUT;
-    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
-  } else {
-    logger->error(rv, "NetModSocket::recvfrom_nowait:  %d recvfrom failed",  get_key());
-  }
-  return rv;
+  return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
 }
 
 /**
@@ -576,10 +506,10 @@
 // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
 int NetModSocket::sendandrecv_timeout( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, int sec, int nsec){
   struct timespec timeout = {sec, nsec};
-  return shmModSocket.sendandrecv_timeout(send_buf, send_size, key, recv_buf, recv_size, &timeout);
+  return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, &timeout, BUS_TIMEOUT_FLAG);
 }
 int NetModSocket::sendandrecv_nowait( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) {
-  return shmModSocket.sendandrecv_nowait(send_buf, send_size, key, recv_buf, recv_size);
+  return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, NULL, BUS_NOWAIT_FLAG);
 }
 
 
@@ -595,10 +525,10 @@
 // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
 int  NetModSocket::sub_timeout( void *topic, int size, int key, int sec, int nsec){
   struct timespec timeout = {sec, nsec};
-  return shmModSocket.sub_timeout((char *)topic,  size,  key, &timeout);
+  return shmModSocket.sub((char *)topic,  size,  key, &timeout, BUS_TIMEOUT_FLAG);
 }
 int  NetModSocket::sub_nowait( void *topic, int size, int key){
-  return shmModSocket.sub_nowait((char *)topic,  size,  key);
+  return shmModSocket.sub((char *)topic,  size,  key, NULL, BUS_NOWAIT_FLAG);
 }
 
 
@@ -615,10 +545,10 @@
 // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
 int  NetModSocket::desub_timeout( void *topic, int size, int key, int sec, int nsec){
   struct timespec timeout = {sec, nsec};
-  return shmModSocket.desub_timeout((char *)topic,  size,  key, &timeout);
+  return shmModSocket.desub((char *)topic,  size,  key, &timeout, BUS_TIMEOUT_FLAG);
 }
 int  NetModSocket::desub_nowait( void *topic, int size, int key){
-  return shmModSocket.desub_nowait((char *)topic,  size,  key);
+  return shmModSocket.desub((char *)topic,  size,  key, NULL, BUS_NOWAIT_FLAG);
 }
 
 
@@ -635,10 +565,10 @@
 //  瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
 int  NetModSocket::pub_timeout( char *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec){
   struct timespec timeout = {sec, nsec};
-  return shmModSocket.pub_timeout(topic, topic_size, content, content_size, key, &timeout);
+  return shmModSocket.pub(topic, topic_size, content, content_size, key, &timeout, BUS_TIMEOUT_FLAG);
 }
 int  NetModSocket::pub_nowait( char *topic, int topic_size, void *content, int content_size, int key){
-  return shmModSocket.pub_nowait(topic, topic_size, content, content_size, key);
+  return shmModSocket.pub(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
 }
 
 
diff --git a/src/net/net_mod_socket_wrapper.h b/src/net/net_mod_socket_wrapper.h
index 171a3a8..b794545 100644
--- a/src/net/net_mod_socket_wrapper.h
+++ b/src/net/net_mod_socket_wrapper.h
@@ -95,6 +95,7 @@
  * @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
  */
 int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key);
+
 /**
  * @brief 绛夊緟鎺ユ敹淇℃伅锛屽湪鎸囧畾鐨勬椂闂村唴鍗充娇娌℃湁鎺ュ彈鍒版秷鎭篃瑕佽繑鍥�
  *
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 058c081..a012e80 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -18,7 +18,7 @@
 	struct timespec timeout = {1, 0};
 	if(bus_set != NULL) {
 		for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
-			desub_timeout(NULL, 0, *bus_iter, &timeout);
+			desub(NULL, 0, *bus_iter, &timeout, BUS_TIMEOUT_FLAG);
 		}
 		delete bus_set;
 	}
@@ -37,154 +37,65 @@
 int ShmModSocket::force_bind(int key) {
 	return shm_socket_force_bind(shm_socket, key);
 }
+
 /**
  * 鍙戦�佷俊鎭�
  * @key 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
  */
-int ShmModSocket::sendto(const void *buf, const int size, const int key) {
-		return shm_sendto(shm_socket, buf, size, key, NULL, 0);
+int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) {
+	int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag);
+  if(rv == 0) {
+	  logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
+	  return 0;
+  }
+
+  logger->debug("ShmModSocket::sendto : %d sendto  %d failed %s", get_key(),  key, bus_strerror(rv));
+  return rv;
 }
-// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) {
-	return shm_sendto(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
-int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
-	return shm_sendto(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
-}
- 
 
 /**
  * 鎺ユ敹淇℃伅
  * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
-	int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, 0);
-   
+int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
+	int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
+
+	if(rv == 0) {
+    logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
+    return 0;
+  }
+
+  logger->debug("ShmModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
   return rv;
 }
-
-
-int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, const struct timespec *timeout) {
-	int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
-
- 	return rv;
-}
-
-int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
-	int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
-	// logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
-  return rv;
-}
+ 
 
 /**
  * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
  * @key 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
-	return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
+int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, 
+	void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
+	return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
 }
+ 
 // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
-	return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
+int ShmModSocket::sendandrecv_unsafe(const void *send_buf, const int send_size, const int send_key, 
+	void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
+	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
 }
-int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
-	return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
-}
-
-
-
-
-int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
-	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
-	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
-}
-int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
-	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
-}
-
-
-
-
+ 
 /**
  * 璁㈤槄鎸囧畾涓婚
  * @topic 涓婚
  * @size 涓婚闀垮害
  * @key 鎬荤嚎绔彛
  */
-int  ShmModSocket::sub(char *topic, int size, int key){
-	return _sub_( topic, size, key, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int  ShmModSocket::sub_timeout(char *topic, int size, int key, const struct timespec *timeout){
-	return _sub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-int  ShmModSocket::sub_nowait(char *topic, int size, int key) {
-	return _sub_(topic, size, key, NULL,  BUS_NOWAIT_FLAG);
-}
-
-
-
-/**
- * 鍙栨秷璁㈤槄鎸囧畾涓婚
- * @topic 涓婚
- * @size 涓婚闀垮害
- * @key 鎬荤嚎绔彛
- */
-int  ShmModSocket::desub(char *topic, int size, int key){
-	return _desub_( topic, size, key, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int  ShmModSocket::desub_timeout(char *topic, int size, int key, const struct timespec *timeout){
-	return _desub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-int  ShmModSocket::desub_nowait(char *topic, int size, int key) {
-	return _desub_(topic, size, key, NULL,  BUS_NOWAIT_FLAG);
-}
-
-
-
-/**
- * 鍙戝竷涓婚
- * @topic 涓婚
- * @content 涓婚鍐呭
- * @key 鎬荤嚎绔彛
- */
-int  ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){
-		return _pub_(topic, topic_size, content, content_size, key, NULL, 0);
-}
-//  瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int  ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec * timeout){
-	return _pub_( topic, topic_size, content, content_size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-int  ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
-	return _pub_(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
-}
-
-
-/**
- * 鑾峰彇soket key
- */
-int ShmModSocket::get_key(){
-	return shm_socket->key;
-}
-
-
-
-// =============================================================================
-/**
- * @key 鎬荤嚎绔彛
- */
-int  ShmModSocket::_sub_(char *topic, int topic_size, int key,  
+int  ShmModSocket::sub(char *topic, int topic_size, int key,  
 	const struct timespec *timeout, int flags) {
-	 
-
 	int ret;
 	bus_head_t head = {};
 	memcpy(head.action, "sub", sizeof(head.action));
@@ -206,10 +117,15 @@
 }
 
 
+
+
 /**
+ * 鍙栨秷璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
  * @key 鎬荤嚎绔彛
  */
-int  ShmModSocket::_desub_(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
+int  ShmModSocket::desub(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
 	// char buf[8192];
 	int ret;
 	if(topic == NULL) {
@@ -239,18 +155,15 @@
 
 }
 
-/**
- * @key 鎬荤嚎绔彛
- * @str "<**pub**>{缁忔祹}"
- */
- 
-int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) {
-	// int head_len;
-	// char buf[8192+content_size];
-	// snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
-	// head_len = strlen(buf);
-	// memcpy(buf+head_len, content, content_size);
 
+
+/**
+ * 鍙戝竷涓婚
+ * @topic 涓婚
+ * @content 涓婚鍐呭
+ * @key 鎬荤嚎绔彛
+ */
+int  ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) {
 	int ret;
 	bus_head_t head = {};
 	memcpy(head.action, "pub", sizeof(head.action));
@@ -267,9 +180,20 @@
 		return -1;
 	}
 
+}
+ 
 
+
+/**
+ * 鑾峰彇soket key
+ */
+int ShmModSocket::get_key(){
+	return shm_socket->key;
 }
 
+
+
+// =============================================================================
 
 int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, 
   void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) {
@@ -277,7 +201,7 @@
   int buf_size;
   char *buf;
   int  max_buf_size;
-  if((buf = (char *)malloc(MAXBUF)) == NULL) {
+  if((buf = (char *) malloc(MAXBUF)) == NULL) {
     LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
     exit(1);
   } else {
@@ -287,7 +211,7 @@
   buf_size = BUS_HEAD_SIZE + content_size + topic_size  ;
   if(max_buf_size < buf_size) {
     
-    if((buf = (char *)realloc(buf, buf_size)) == NULL) {
+    if((buf = (char *) realloc(buf, buf_size)) == NULL) {
       LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf  realloc buf");
       exit(1);
     } else {
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index 79b3247..f5441ce 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -31,11 +31,7 @@
 
 private:
 	 
-	int _sub_( char *topic, int size, int key, const struct timespec *timeouts,  int flags);
-	int _pub_( char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeouts,  int flags);
-
-	int  _desub_( char *topic, int size, int key, const struct timespec *timeouts, int flags);
-
+	 
 
 	static int get_bus_sendbuf(bus_head_t &request_head, void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf);
 
@@ -65,50 +61,44 @@
 	/**
 	 * 鍙戦�佷俊鎭�
 	 * @key 鍙戦�佺粰璋�
+	 * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
 	 * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 	 */
-	int sendto(const void *buf, const int size, const int key);
-	// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
-	int sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout);
-	// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
-	int sendto_nowait(const void *buf, const int size, const int key);
+ 
+	int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0);
+ 
 
 	/**
 	 * 鎺ユ敹淇℃伅
 	 * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
 	 * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 	*/
-	int recvfrom(void **buf, int *size, int *key);
-	// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-	int recvfrom_timeout(void **buf, int *size, int *key,  const struct timespec *timeout);
-	int recvfrom_nowait(void **buf, int *size, int *key);
+ 
+	int recvfrom(void **buf, int *size, int *key,  const struct timespec *timeout = NULL, int flag = 0);
 
 	/**
 	 * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
 	 * @key 鍙戦�佺粰璋�
+	 * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
 	 * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 	*/
-	int sendandrecv(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
-	// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-	int sendandrecv_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout) ;
-	int sendandrecv_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
+ 
+	int sendandrecv(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size,
+	 const struct timespec *timeout = NULL, int flag = 0);
 
 
-	int sendandrecv_unsafe(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
 	// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-	int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const  struct timespec *timeout) ;
-	int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
+	int sendandrecv_unsafe(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size,
+	 const  struct timespec *timeout = NULL, int flag = 0) ;
 
 	/**
 	 * 璁㈤槄鎸囧畾涓婚
 	 * @topic 涓婚
 	 * @size 涓婚闀垮害
 	 * @key 鎬荤嚎绔彛
+	 * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
 	 */
-	int  sub(char *topic, int size, int key);
-	// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-	int  sub_timeout(char *topic, int size, int key,  const struct timespec *timeout);
-	int  sub_nowait(char *topic, int size, int key);
+	int  sub(char *topic, int size, int key,  const struct timespec *timeout = NULL, int flag = 0);
 
 
 	 /**
@@ -116,22 +106,18 @@
  	 * @topic 涓婚,涓婚涓虹┖鏃跺彇娑堝叏閮ㄨ闃�
 	 * @size 涓婚闀垮害
 	 * @key 鎬荤嚎绔彛
+	 * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
 	 */
-	int desub( char *topic, int size, int key);
-	// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-	int desub_timeout(char *topic, int size, int key, const struct timespec *timeout);
-	int desub_nowait(char *topic, int size, int key) ;
+	int desub(char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0);
 
 	/**
 	 * 鍙戝竷涓婚
 	 * @topic 涓婚
 	 * @content 涓婚鍐呭
 	 * @key 鎬荤嚎绔彛
+	 * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
 	 */
-	int  pub(char *topic, int topic_size, void *content, int content_size, int key);
-	//  瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-	int  pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, const  struct timespec *timeout);
-	int  pub_nowait(char *topic, int topic_size, void *content, int content_size, int key);
+	int  pub(char *topic, int topic_size, void *content, int content_size, int key, const  struct timespec *timeout = NULL, int flag = 0);
 
 
 	/**
diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh
index fc639ca..dd566b2 100755
--- a/test_net_socket/net_mod_socket.sh
+++ b/test_net_socket/net_mod_socket.sh
@@ -9,6 +9,7 @@
 	./test_net_mod_socket --fun="start_reply" --key=100 & server_pid=$! &&  echo "pid: ${server_pid}" 
 }
 
+# 浜や簰寮忓鎴风
 function client() {
 
 	# ./test_net_mod_socket --fun="start_net_client" \
@@ -23,12 +24,25 @@
 	 
 }
 
+# 鏃犻檺寰幆send
+function send() {
+	./test_net_mod_socket --fun="test_net_sendandrecv" \
+	 --sendlist="localhost:5000:100, localhost:5000:100"
+	 
+}
+# 澶氱嚎绋媠end
 function msend() {
 	./test_net_mod_socket --fun="test_net_sendandrecv_threads" \
 	 --sendlist="localhost:5000:100, localhost:5000:100"
 	 
 }
-
+# 鏃犻檺寰幆 pub
+function pub() {
+	./test_net_mod_socket --fun="test_net_pub" \
+	 --publist="localhost:5000, localhost:5000"
+	 
+}
+# 澶氱嚎绋媝ub
 function mpub() {
 	./test_net_mod_socket --fun="test_net_pub_threads" \
 	 --publist="localhost:5000, localhost:5000"
@@ -56,9 +70,15 @@
   "msend")
 	msend
   ;;
+  "send")
+	send
+  ;;
   "mpub")
 	mpub
   ;;
+  "pub")
+	pub
+  ;;
   "close")
  	close
   ;;
diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp
index a1a47ec..46b8be2 100644
--- a/test_net_socket/test_net_mod_socket.cpp
+++ b/test_net_socket/test_net_mod_socket.cpp
@@ -5,6 +5,7 @@
 #include "shm_mm_wrapper.h"
 #include "usg_common.h"
 #include <getopt.h>
+#include "logger_factory.h"
 
 #define  SCALE  100000
 
@@ -141,7 +142,7 @@
   int remote_port;
   while ( (rv = net_mod_socket_recvfrom(client, &recvbuf, &size, &remote_port) ) == 0) {
    // printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
-    sprintf(sendbuf, "RECEIVED  PORT %d NAME %s", remote_port, recvbuf);
+    sprintf(sendbuf, "RECEIVED锛�  %s", recvbuf);
     net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port);
     free(recvbuf);
   }
@@ -194,7 +195,7 @@
         n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1000);
 		    printf(" %d nodes reply\n", n);
 		    for(i=0; i<recv_arr_size; i++) {
-		    	printf("host:%s, port: %d, key:%d, content: %s\n", 
+		    	printf("reply from (host:%s, port: %d, key:%d) >> %s\n", 
 		    		recv_arr[i].host,
 		    		recv_arr[i].port,
 		    		recv_arr[i].key,
@@ -247,7 +248,8 @@
   Targ *targ = (Targ *)arg;
   char sendbuf[512];
  
-  int i,j, n, recv_arr_size;
+  int i,j, n;
+  int recv_arr_size;
   net_mod_recv_msg_t *recv_arr;
   int total = 0;
  
@@ -271,7 +273,7 @@
     n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000);
     printf("%d: send %d nodes\n", i, n);
     for(j=0; j < recv_arr_size; j++) {
-    	fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n", 
+    	fprintf(fp, "reply from (host:%s, port: %d, key:%d) >> %s\n", 
     		recv_arr[j].host,
     		recv_arr[j].port,
     		recv_arr[j].key,
@@ -287,9 +289,10 @@
   return (void *)total;
 }
 
+//澶氱嚎绋媠end
 void test_net_sendandrecv_threads(char *nodelist) {
 
-  int status, i = 0, processors = 1;
+  int status, i = 0, processors = 4;
   void *res[processors];
   // Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
   Targ targs[processors];
@@ -326,10 +329,42 @@
  
 }
 
+// 鏃犻檺寰幆send
+void test_net_sendandrecv(char *nodelist) {
+
+  int n, i;
+  void * client;
+  int recv_arr_size;
+  net_mod_recv_msg_t *recv_arr;
+  net_node_t *node_arr;
+  int node_arr_size = parse_node_list(nodelist, &node_arr);
+  char content[128];
+
+  sprintf(content, "pid:%ld say Hello!!", (long)getpid());
+  client = net_mod_socket_open();
+  while(true) {
+    n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1000);
+    printf(" %d nodes reply\n", n);
+    for(i=0; i<recv_arr_size; i++) {
+      LoggerFactory::getLogger()->debug("reply from (host:%s, port: %d, key:%d) >> %s\n", 
+        recv_arr[i].host,
+        recv_arr[i].port,
+        recv_arr[i].key,
+        recv_arr[i].content
+      );
+    }
+    
+    // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+    net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+  }
+
+  net_mod_socket_close(client);
+ 
+}
 
 void *_run_pub_(void *arg) {
   Targ *targ = (Targ *)arg;
-  char sendbuf[512];
+  char sendbuf[128];
  
   int i,j, n;
   int total = 0;
@@ -338,9 +373,6 @@
   int node_arr_size = parse_node_list(targ->nodelist, &node_arr);
  
   char *topic = "news";
-
- 
-  
   // char filename[512];
   // sprintf(filename, "test%d.tmp", targ->id);
   // FILE *fp = NULL;
@@ -353,7 +385,7 @@
    
     n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1);
     // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
-    printf( "pub:%s to  %d nodes\n", sendbuf, n);
+    LoggerFactory::getLogger()->debug( "pub:%s to  %d nodes\n", sendbuf, n);
     total += n;
   }
   // fclose(fp);
@@ -361,6 +393,7 @@
   return (void *)total;
 }
 
+//澶氱嚎绋媝ub
 void test_net_pub_threads(char *nodelist) {
 
   int status, i = 0, processors = 4;
@@ -399,6 +432,28 @@
   // fflush(stdout);
   net_mod_socket_close(client);
 }
+
+// 鏃犻檺寰幆pub
+void test_net_pub(char *nodelist) {
+
+  int n;
+  char sendbuf[512];
+  net_node_t *node_arr;
+  int node_arr_size = parse_node_list(nodelist, &node_arr);
+ 
+  char *topic = "news";
+  sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid());
+
+  void * client = net_mod_socket_open();
+  while (true) {
+    n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1);
+    // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
+    LoggerFactory::getLogger()->debug( "pub to  %d nodes\n", n);
+  }
+  net_mod_socket_close(client);
+}
+
+
 
 
 
@@ -456,6 +511,15 @@
      
     test_net_sendandrecv_threads(opt.sendlist);
   }
+  else if (strcmp("test_net_sendandrecv", opt.fun) == 0) {
+    if(opt.sendlist == 0) {
+      fprintf(stderr, "Missing sendlist .\n");
+      usage(argv[0]);
+      exit(1);
+    }
+     
+    test_net_sendandrecv(opt.sendlist);
+  }
   else if (strcmp("test_net_pub_threads", opt.fun) == 0) {
     if(opt.publist == 0) {
       fprintf(stderr, "Missing publist .\n");
@@ -465,6 +529,15 @@
      
     test_net_pub_threads(opt.publist);
   }
+  else if (strcmp("test_net_pub", opt.fun) == 0) {
+    if(opt.publist == 0) {
+      fprintf(stderr, "Missing publist .\n");
+      usage(argv[0]);
+      exit(1);
+    }
+     
+    test_net_pub(opt.publist);
+  }
   
   else {
     usage(argv[0]);

--
Gitblit v1.8.0