From af85260254bacac40a68d4f5f61950523beb3a27 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 19 十月 2020 17:02:41 +0800
Subject: [PATCH] update

---
 src/socket/net_mod_socket.c                |   98 +++++++++++-----
 src/socket/shm_mod_socket.h                |    5 
 src/socket/shm_socket.c                    |   21 +++
 src/socket/net_mod_server_socket.h         |    8 +
 src/socket/net_mod_socket.h                |   18 ++
 Makefile                                   |    2 
 src/socket/shm_socket.h                    |    7 +
 test_net_socket/net_mod_socket.c           |  103 ++++++++++++++++
 src/socket/net_mod_server_socket_wrapper.h |    2 
 src/socket/net_mod_server_socket.c         |   80 ++++++++++---
 src/socket/shm_mod_socket.c                |   12 ++
 src/socket/net_mod_server_socket_wrapper.c |    4 
 12 files changed, 298 insertions(+), 62 deletions(-)

diff --git a/Makefile b/Makefile
index f55f829..d471709 100755
--- a/Makefile
+++ b/Makefile
@@ -1,5 +1,5 @@
 # debug "make --just-print"
-DIRS = src test_net_socket
+DIRS = src test_net_socket test_socket
 TAR_NAME = shm_queue.tar.gz
 
 all:
diff --git a/src/socket/net_mod_server_socket.c b/src/socket/net_mod_server_socket.c
index ccc17e2..51e10e9 100644
--- a/src/socket/net_mod_server_socket.c
+++ b/src/socket/net_mod_server_socket.c
@@ -4,37 +4,55 @@
 #include "net_mod_socket_io.h"
 #include "net_mod_socket.h"
  
-NetModServerSocket::NetModServerSocket(int port):max_buf(1024), max_topic_buf(256)
+NetModServerSocket::NetModServerSocket(int _port): listenfd(0), port(_port), max_buf(1024), max_topic_buf(256), max_response_buf(1024)
 {
-  char portstr[32];
-
-  //shmModSocket = new ShmModSocket;
-  sprintf(portstr, "%d", port);
-  listenfd = Open_listenfd(portstr);
-  init_pool(listenfd);
+  
 
   buf = malloc(max_buf);
   if(buf == NULL) {
-    err_exit(errno, "process_client malloc");
+    err_exit(errno, "NetModServerSocket::NetModServerSocket malloc");
   }
 
   topic_buf = malloc(max_topic_buf);
   if(topic_buf == NULL) {
-    err_exit(errno, "process_client malloc");
+    err_exit(errno, "NetModServerSocket::NetModServerSocket malloc");
+  }
+
+  response_buf = (char *) malloc(max_response_buf);
+  if(response_buf == NULL) {
+    err_exit(errno, "NetModServerSocket::NetModServerSocket malloc");
   }
 }
 
 
 NetModServerSocket::~NetModServerSocket() {
-   Close(listenfd);
-   free(buf);
+  if(listenfd != 0) {
+    Close(listenfd);
+  }
+  
+  if(buf != NULL)
+    free(buf);
+  if(topic_buf != NULL)
    free(topic_buf);
+  if(response_buf != NULL)
+   free(response_buf);
 }
 
-void NetModServerSocket::start() {
+int NetModServerSocket::start() {
 	int connfd;
 	socklen_t clientlen;
   struct sockaddr_storage clientaddr;
+  char portstr[32];
+
+  //shmModSocket = new ShmModSocket;
+  sprintf(portstr, "%d", port);
+  listenfd = open_listenfd(portstr);
+  if(listenfd < 0) {
+    LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start");
+    return -1;
+  }
+  init_pool(listenfd);
+
 	while (1)
   {
     /* Wait for listening/connected descriptor(s) to become ready */
@@ -52,6 +70,7 @@
     /* Echo a text line from each ready connected descriptor */
     check_clients();
   }
+  return 0;
 }
 
 void  NetModServerSocket::init_pool(int listenfd)
@@ -105,8 +124,8 @@
   net_mod_response_head_t response_head;
   char request_head_bs[NET_MODE_REQUEST_HEAD_LENGTH];
   void  *recv_buf;
-char tmp[8196];
-  int recv_size;
+// char tmp[8196];
+  int recv_size, response_buf_size;
 
   if (rio_readn(connfd, request_head_bs, NET_MODE_REQUEST_HEAD_LENGTH) !=  NET_MODE_REQUEST_HEAD_LENGTH)
   {
@@ -129,11 +148,32 @@
   }
 
   if(request_head.mod == REQ_REP) {
-    // TODO: shmModSocket.sendandrecv_unsafe
-    shmModSocket.sendandrecv(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size);
-    response_head.content_length = recv_size;
-    Rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH);
-    Rio_writen(connfd, recv_buf, recv_size);
+    if(shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size) != 0) {
+      response_head.code = 1;
+      response_head.content_length = 0;
+      if( rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH )
+        return -1;
+      //Rio_writen(connfd, recv_buf, recv_size);
+    } else {
+      response_head.code = 0;
+      response_head.content_length = recv_size;
+
+      response_buf_size = NET_MODE_RESPONSE_HEAD_LENGTH + recv_size;
+      if(max_response_buf < response_buf_size) {
+        buf = (char *)realloc(response_buf, response_buf_size);
+        max_response_buf = response_buf_size;
+      }
+      memcpy(response_buf, NetModSocket::encode_response_head(response_head),  NET_MODE_RESPONSE_HEAD_LENGTH);
+      memcpy(response_buf + NET_MODE_RESPONSE_HEAD_LENGTH, recv_buf,  recv_size);
+
+      if(rio_writen(connfd, response_buf, response_buf_size) != response_buf_size) {
+        return -1;
+      }
+      
+    }
+    return 0;
+    
+   
   } else if(request_head.mod == BUS) {
     if(request_head.topic_length > max_topic_buf) {
       topic_buf = realloc(topic_buf, request_head.topic_length);
@@ -147,7 +187,7 @@
     if (rio_readn(connfd, topic_buf, request_head.topic_length) != request_head.topic_length ) {
       return -1;
     }
- LoggerFactory::getLogger()->debug("====server pub %s===\n", buf);
+LoggerFactory::getLogger()->debug("====server pub %s===\n", buf);
     shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key);
   }
 
diff --git a/src/socket/net_mod_server_socket.h b/src/socket/net_mod_server_socket.h
index 40219e1..3f80931 100644
--- a/src/socket/net_mod_server_socket.h
+++ b/src/socket/net_mod_server_socket.h
@@ -28,13 +28,18 @@
 
 private:
 	int listenfd;
+	int port;
 	ShmModSocket shmModSocket;
 	pool pool;
 
 	void *buf;
 	void *topic_buf;
+	char *response_buf;
+
   size_t max_buf;
   size_t max_topic_buf;
+  size_t max_response_buf;
+
 
 	void init_pool(int listenfd);
 	void add_client(int connfd);
@@ -47,8 +52,9 @@
 	
 	/*
 	 * 鍚姩 server
+	 * @return 0 success, 鍏朵粬 failture
 	*/
-	void start();
+	int start();
 	~NetModServerSocket();
 
 };
diff --git a/src/socket/net_mod_server_socket_wrapper.c b/src/socket/net_mod_server_socket_wrapper.c
index 2aed1cf..cdd4c49 100644
--- a/src/socket/net_mod_server_socket_wrapper.c
+++ b/src/socket/net_mod_server_socket_wrapper.c
@@ -14,7 +14,7 @@
 
 }
 
-void net_mod_server_socket_start(void *_sockt) {
+int net_mod_server_socket_start(void *_sockt) {
 	net_mod_server_socket_t *sockt = (net_mod_server_socket_t *)_sockt;
-	sockt->sockt->start();
+	return sockt->sockt->start();
 }
diff --git a/src/socket/net_mod_server_socket_wrapper.h b/src/socket/net_mod_server_socket_wrapper.h
index 0bb7571..1a6d143 100644
--- a/src/socket/net_mod_server_socket_wrapper.h
+++ b/src/socket/net_mod_server_socket_wrapper.h
@@ -25,7 +25,7 @@
 /**
  * 鍚姩
  */
-void net_mod_server_socket_start(void *_sockt);
+int net_mod_server_socket_start(void *_sockt);
 
 
 
diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c
index c7b3cc7..560f8be 100644
--- a/src/socket/net_mod_socket.c
+++ b/src/socket/net_mod_socket.c
@@ -10,6 +10,8 @@
 NetModSocket::NetModSocket() 
 {
 		init_req_rep_req_resp_pool();
+   
+  
 }
 
 
@@ -20,6 +22,7 @@
     Close(clientfd);
      
   }
+  
 }
 
 
@@ -95,7 +98,9 @@
   net_node_t *node =  req_resp_pool.connfdNodeMap.find(connfd)->second;
 
   // std::map<std::string, int>::iterator mapIter;
-  Close(connfd); //line:conc:echoservers:closeconnfd
+  if(close(connfd) != 0) {
+    LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect");
+  }
   FD_CLR(connfd, &req_resp_pool.read_set);
   FD_CLR(connfd, &req_resp_pool.write_set);
   FD_CLR(connfd, &req_resp_pool.except_set);
@@ -104,23 +109,23 @@
   // char portstr[32];
   sprintf(mapKey, "%s:%d", node->host, node->port);
   req_resp_pool.connectionMap.erase(mapKey);
-LoggerFactory::getLogger()->debug("close_connect");
+// LoggerFactory::getLogger()->debug("close_connect");
 }
 
 
 int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
 
-  int i,  recv_size,  connfd;
+  int i, n, recv_size,  connfd;
   net_node_t *node;
   void *recv_buf;
- 
+  struct timeval timeout = {5, 0};
  
   int n_conn_suc = 0, n_recv_suc = 0;
    
   net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
   
-  init_req_rep_req_resp_pool();
+  //init_req_rep_req_resp_pool();
 
   for (i = 0; i< arrlen; i++) {
 
@@ -134,23 +139,17 @@
       ret_arr[n_recv_suc].content = recv_buf;
       ret_arr[n_recv_suc].content_length = recv_size;
       n_recv_suc++;
+      continue;
     }
 
     if( (connfd = connect(node)) < 0 ) {
       continue;
     }
 
-
-    // if(write_request(connfd, node->key, send_buf, send_size) != 0) {
-    //   close_connect(connfd);
-    // }
-
     n_conn_suc++;
-    // optval = 0;
-    // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
   }
 
-printf("n_conn_suc =%d\n", n_conn_suc);
+// printf("n_conn_suc =%d\n", n_conn_suc);
 
   while(n_recv_suc < n_conn_suc)
   {
@@ -158,8 +157,13 @@
     req_resp_pool.ready_read_set = req_resp_pool.read_set;
     req_resp_pool.ready_write_set = req_resp_pool.write_set;
     req_resp_pool.ready_except_set = req_resp_pool.except_set;
-    req_resp_pool.nready = select(req_resp_pool.maxfd + 1, &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set, &req_resp_pool.ready_except_set, NULL);
-printf("req_resp_pool.nready =%d\n", req_resp_pool.nready);
+    if( (req_resp_pool.nready = select(req_resp_pool.maxfd + 1, 
+      &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set, 
+      &req_resp_pool.ready_except_set, &timeout)) <= 0) {
+      // wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂�
+      break;
+    }
+// printf("req_resp_pool.nready =%d\n", req_resp_pool.nready);
     for (i = 0; (i <= req_resp_pool.maxi) && (req_resp_pool.nready > 0); i++) {
       if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
         /* If the descriptor is ready, echo a text line from it */
@@ -167,17 +171,22 @@
         if ( FD_ISSET(connfd, &req_resp_pool.ready_read_set))
         {
           req_resp_pool.nready--;
-          if(read_response(connfd, ret_arr+n_recv_suc) == 0) {
+          if( (n = read_response(connfd, ret_arr+n_recv_suc)) == 0) {
+            
+            // 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅
+            FD_CLR(connfd, &req_resp_pool.read_set);
+            req_resp_pool.connfd[i] = -1;  
             n_recv_suc++;
-          } else {
+          } else if(n == -1)  {
             close_connect(connfd);
           }
           
         }
+
         if (FD_ISSET(connfd, &req_resp_pool.ready_write_set))
         {
           req_resp_pool.nready--;
-printf("write %d\n", connfd);
+// printf("write %d\n", connfd);
           if(write_request(connfd, node->key, send_buf, send_size) != 0) {
             close_connect(connfd);
           } else{
@@ -195,6 +204,15 @@
     }
   }
 
+  FD_ZERO(&req_resp_pool.except_set);
+  for (i = 0; i <= req_resp_pool.maxi; i++) {
+    if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
+      // 鍏抽棴骞舵竻闄ゅ啓鍏ユ垨璇诲彇澶辫触鐨勮繛鎺�
+      close_connect(connfd);
+    }
+  }
+  req_resp_pool.maxi = -1;
+
   *recv_arr = ret_arr;
   if(recv_arr_size != NULL) {
     *recv_arr_size = n_recv_suc;
@@ -205,21 +223,25 @@
 
 int NetModSocket::write_request(int clientfd, int key, void *send_buf, int send_size) {
   net_mod_request_head_t request_head = {};
-  static char *buf;
-  static int buf_size, max_buf_size;
- 
-
+  int buf_size;
+  char *buf;
+  int  max_buf_size;
+  buf = (char *)malloc(MAXBUF);
   if(buf == NULL) {
-    buf = (char *)malloc(MAXBUF);
+    LoggerFactory::getLogger()->error(errno, "NetModSocket::NetModSocket malloc");
+    exit(1);
+  } else {
     max_buf_size = MAXBUF;
-    LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv malloc");
-   
   }
 
   buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
   if(max_buf_size < buf_size) {
     buf = (char *)realloc(buf, buf_size);
     max_buf_size = buf_size;
+    if(buf == NULL) {
+      LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request  realloc");
+      exit(1);
+    }
   }
 
   request_head.mod = REQ_REP;
@@ -234,13 +256,18 @@
 
 
   if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
-    LoggerFactory::getLogger()->error(errno, "NetModSocket::send conent rio_writen");
-    
+    LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request  rio_writen");
+    free(buf);
     return -1;
   }
+  free(buf);
   return 0;
 }
 
+/**
+ * @return 0 鎴愬姛, 1 瀵规柟娌℃湁瀵瑰簲鐨刱ey, -1 缃戠粶閿欒
+ *
+ */
 int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg) {
   int recv_size;
   void *recv_buf;
@@ -255,10 +282,14 @@
   }
 
   response_head =  NetModSocket::decode_response_head(response_head_bs);
+  if(response_head.code != 0) {
+    // 瀵规柟娌℃湁瀵瑰簲鐨刱ey
+    return 1;
+  }
 
   recv_buf = malloc(response_head.content_length);
   if(recv_buf == NULL) {
-    LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc");
+    LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc recv_buf");
     exit(1);
   }
   if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
@@ -351,6 +382,9 @@
     }
 
     response_head =  NetModSocket::decode_response_head(response_head_bs);
+    if(response_head.code != 0) {
+      continue;
+    }
 
     recv_buf = malloc(response_head.content_length);
     if(recv_buf == NULL) {
@@ -452,7 +486,7 @@
   free(buf);
   return nsuc;
 }
- 
+
 void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
 
   for(int i =0; i< size; i++) {
@@ -488,13 +522,15 @@
 
 void * NetModSocket::encode_response_head(net_mod_response_head_t & response) {
   char * head = (char *)malloc(NET_MODE_RESPONSE_HEAD_LENGTH);
-  PUT(head, htonl(response.content_length));
+  PUT(head, htonl(response.code));
+  PUT(head + 4, htonl(response.content_length));
   return head;
 }
 
 net_mod_response_head_t  NetModSocket::decode_response_head(void *_headbs) {
   char *headbs = (char *)_headbs;
   net_mod_response_head_t head;
-  head.content_length = ntohl(GET(headbs));
+  head.code = ntohl(GET(headbs));
+  head.content_length = ntohl(GET(headbs + 4));
   return head;
 }
diff --git a/src/socket/net_mod_socket.h b/src/socket/net_mod_socket.h
index a68c8cf..c133302 100644
--- a/src/socket/net_mod_socket.h
+++ b/src/socket/net_mod_socket.h
@@ -7,8 +7,7 @@
 #define GET(p)       (*(uint32_t *)(p))
 #define PUT(p, val)  (*(uint32_t *)(p) = (val))
 
-#define NET_MODE_REQUEST_HEAD_LENGTH 16
-#define NET_MODE_RESPONSE_HEAD_LENGTH 4
+
 
 
 
@@ -21,6 +20,7 @@
 	int key;
 };
 
+#define NET_MODE_REQUEST_HEAD_LENGTH 16
 
 struct net_mod_request_head_t {
 	uint32_t mod;
@@ -29,9 +29,12 @@
 	uint32_t topic_length;
 };
 
+#define NET_MODE_RESPONSE_HEAD_LENGTH 8
+
 struct net_mod_response_head_t {
 	// socket_mod_t mod;
 	// int key;
+  uint32_t code;
 	uint32_t content_length;
 };
 
@@ -72,6 +75,8 @@
    
   ShmModSocket shmModSocket;
   pool req_resp_pool;
+ 
+
 
   static void * encode_request_head(net_mod_request_head_t & request);
   static net_mod_request_head_t  decode_request_head(void *headbs);
@@ -97,11 +102,18 @@
    * @send_buf 鍙戦�佺殑娑堟伅锛孈send_size 璇ユ秷鎭綋鐨勯暱搴�
    * @recv_arr 杩斿洖鐨勫簲绛旀秷鎭粍锛孈recv_arr_size 璇ユ暟缁勯暱搴�
    * @return 鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁�
+   * 浼樼偣锛氭棤闃诲锛屾�ц兘濂�
+   * 缂虹偣锛氫笉鏄嚎绋嬪畨鍏ㄧ殑
    */
   int sendandrecv(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, 
   	net_mod_recv_msg_t ** recv_arr, int *recv_arr_size);
 
-
+  /**
+   * 鍔熻兘鍚宻endandrecv
+   * 浼樼偣锛氱嚎绋嬪畨鍏�
+   * 缂虹偣锛氶樆濉炵殑锛屾�ц兘涓嶅sendandrecv
+   * 
+   */
   int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, 
     net_mod_recv_msg_t ** recv_arr, int *recv_arr_size);
 
diff --git a/src/socket/shm_mod_socket.c b/src/socket/shm_mod_socket.c
index bd60993..e890721 100644
--- a/src/socket/shm_mod_socket.c
+++ b/src/socket/shm_mod_socket.c
@@ -182,6 +182,18 @@
 	return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
 }
 
+int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){
+	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0);
+}
+// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
+int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){
+	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0);
+}
+int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){
+	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
+}
+
+
 
 /**
  * 鍚姩bus
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index 3365804..b6dadee 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -104,6 +104,11 @@
 	int sendandrecv_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
 	int sendandrecv_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
 
+	int sendandrecv_unsafe(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
+	// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
+	int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
+	int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
+
 
 	/**
 	 * 鍚姩bus
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index ab34bca..f07d8a6 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -386,6 +386,27 @@
   return -1;
 }
 
+int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf,
+                    const int send_size, const int send_port, void **recv_buf,
+                    int *recv_size,  struct timespec *timeout,  int flags) {
+  if (socket->socket_type != SHM_SOCKET_DGRAM) {
+    err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket  "
+                "which is not a SHM_SOCKET_DGRAM socket ",
+             socket->socket_type);
+  }
+  int recv_port;
+  int rv;
+
+ 
+  if ((rv = shm_sendto(socket, send_buf, send_size, send_port, timeout, flags)) == 0) {
+    rv = shm_recvfrom(socket, recv_buf, recv_size, &recv_port, timeout, flags);
+    return rv;
+  } else {
+    return rv;
+  }
+  return -1;
+}
+
 // ============================================================================================================
 
 /**
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index e38fd0e..4f1efc2 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -100,6 +100,13 @@
 int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size,  
 	struct timespec * timeout = NULL,  int flags=0);
 
+/**
+ * 鍔熻兘鍚宻hm_sendandrecv, 浣嗘槸涓嶆槸绾跨▼瀹夊叏鐨�
+ */
+int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size,  
+	struct timespec * timeout = NULL,  int flags=0);
+
+
 
 
 #endif
\ No newline at end of file
diff --git a/test_net_socket/net_mod_socket.c b/test_net_socket/net_mod_socket.c
index 9f1bfa1..a763c5e 100644
--- a/test_net_socket/net_mod_socket.c
+++ b/test_net_socket/net_mod_socket.c
@@ -4,9 +4,17 @@
 #include "dgram_mod_socket.h"
 #include "usg_common.h"
 
+typedef struct Targ {
+	int port;
+	int id;
+
+}Targ;
+
 void server(int port) {
 	void *serverSocket  = net_mod_server_socket_open(port);
-	net_mod_server_socket_start(serverSocket);
+	if(net_mod_server_socket_start(serverSocket) != 0) {
+		err_exit(errno, "net_mod_server_socket_start");
+	}
 }
 
 void client(int port ){
@@ -14,13 +22,15 @@
 	char content[MAXLINE];
 	char action[512];
   char topic[512];
-	net_mod_recv_msg_t *recv_arr;
+	
 	int recv_arr_size, i, n;
 	int node_arr_size = 3;
+
+	net_mod_recv_msg_t *recv_arr;
 	//192.168.20.104
 	net_node_t node_arr[] = {
 		{"192.168.5.22", port, 11},
-		{"192.168.20.10", port, 11},
+		{"192.168.20.104", port, 21},
 		{"192.168.20.104", port, 11}
 	};
 
@@ -76,6 +86,90 @@
 
   
 }
+ 
+
+#define  SCALE  100000
+
+void *runclient(void *arg) {
+  Targ *targ = (Targ *)arg;
+  int port = targ->port;
+  char sendbuf[512];
+ 
+  int i,j, n, recv_arr_size;
+  net_mod_recv_msg_t *recv_arr;
+
+  int node_arr_size = 1;
+	//192.168.20.104
+	net_node_t node_arr[] = {
+		{NULL, port, 11}
+	};
+
+  void * client = net_mod_socket_open();
+	
+	char filename[512];
+	sprintf(filename, "test%d.tmp", targ->id);
+	FILE *fp = NULL;
+	fp = fopen(filename, "w+");
+	// fp = stdout;
+
+	int recvsize;
+	void *recvbuf;
+  for (i = 0; i < SCALE; i++) {
+    sprintf(sendbuf, "thread(%d) %d", targ->id, i);
+    fprintf(fp, "requst:%s\n", sendbuf);
+    n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
+    //printf("send %d nodes\n", n);
+    for(j=0; j < recv_arr_size; j++) {
+    	fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n", 
+    		recv_arr[j].host,
+    		recv_arr[j].port,
+    		recv_arr[j].key,
+    		recv_arr[j].content
+    	);
+    }
+		// 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+		net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+  }
+  fclose(fp);
+  net_mod_socket_close(client);
+  return (void *)i;
+}
+
+void mclient(int port) {
+
+  int status, i = 0, processors = 4;
+  void *res[processors];
+  // Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
+  Targ targs[processors];
+  pthread_t tids[processors];
+  char sendbuf[512];
+  struct timeval start, end;
+  long total = 0;
+  
+  gettimeofday(&start, NULL);
+  for (i = 0; i < processors; i++) {
+    targs[i].port = port;
+    targs[i].id = i;
+    pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]);
+  }
+
+  for (i = 0; i < processors; i++) {
+    if (pthread_join(tids[i], &res[i]) != 0) {
+      perror("multyThreadClient pthread_join");
+    } else {
+    	total += (long)res[i];
+      //fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]);
+    }
+  }
+
+  gettimeofday(&end, NULL);
+
+  double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
+  long diffsec = (long) (difftime/1000000);
+  long diffusec = difftime - diffsec*1000000;
+  fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total );
+  // fflush(stdout);
+}
 
 int main(int argc, char *argv[]) {
 	shm_init(512);
@@ -95,6 +189,9 @@
 
   if (strcmp("client", argv[1]) == 0)
      client(port);
+
+  if (strcmp("mclient", argv[1]) == 0)
+    mclient(port);
 }
 
 

--
Gitblit v1.8.0