From af85260254bacac40a68d4f5f61950523beb3a27 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 19 十月 2020 17:02:41 +0800
Subject: [PATCH] update

---
 src/socket/net_mod_socket.c |   98 +++++++++++++++++++++++++++++++++---------------
 1 files changed, 67 insertions(+), 31 deletions(-)

diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c
index c7b3cc7..560f8be 100644
--- a/src/socket/net_mod_socket.c
+++ b/src/socket/net_mod_socket.c
@@ -10,6 +10,8 @@
 NetModSocket::NetModSocket() 
 {
 		init_req_rep_req_resp_pool();
+   
+  
 }
 
 
@@ -20,6 +22,7 @@
     Close(clientfd);
      
   }
+  
 }
 
 
@@ -95,7 +98,9 @@
   net_node_t *node =  req_resp_pool.connfdNodeMap.find(connfd)->second;
 
   // std::map<std::string, int>::iterator mapIter;
-  Close(connfd); //line:conc:echoservers:closeconnfd
+  if(close(connfd) != 0) {
+    LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect");
+  }
   FD_CLR(connfd, &req_resp_pool.read_set);
   FD_CLR(connfd, &req_resp_pool.write_set);
   FD_CLR(connfd, &req_resp_pool.except_set);
@@ -104,23 +109,23 @@
   // char portstr[32];
   sprintf(mapKey, "%s:%d", node->host, node->port);
   req_resp_pool.connectionMap.erase(mapKey);
-LoggerFactory::getLogger()->debug("close_connect");
+// LoggerFactory::getLogger()->debug("close_connect");
 }
 
 
 int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
 
-  int i,  recv_size,  connfd;
+  int i, n, recv_size,  connfd;
   net_node_t *node;
   void *recv_buf;
- 
+  struct timeval timeout = {5, 0};
  
   int n_conn_suc = 0, n_recv_suc = 0;
    
   net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
   
-  init_req_rep_req_resp_pool();
+  //init_req_rep_req_resp_pool();
 
   for (i = 0; i< arrlen; i++) {
 
@@ -134,23 +139,17 @@
       ret_arr[n_recv_suc].content = recv_buf;
       ret_arr[n_recv_suc].content_length = recv_size;
       n_recv_suc++;
+      continue;
     }
 
     if( (connfd = connect(node)) < 0 ) {
       continue;
     }
 
-
-    // if(write_request(connfd, node->key, send_buf, send_size) != 0) {
-    //   close_connect(connfd);
-    // }
-
     n_conn_suc++;
-    // optval = 0;
-    // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
   }
 
-printf("n_conn_suc =%d\n", n_conn_suc);
+// printf("n_conn_suc =%d\n", n_conn_suc);
 
   while(n_recv_suc < n_conn_suc)
   {
@@ -158,8 +157,13 @@
     req_resp_pool.ready_read_set = req_resp_pool.read_set;
     req_resp_pool.ready_write_set = req_resp_pool.write_set;
     req_resp_pool.ready_except_set = req_resp_pool.except_set;
-    req_resp_pool.nready = select(req_resp_pool.maxfd + 1, &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set, &req_resp_pool.ready_except_set, NULL);
-printf("req_resp_pool.nready =%d\n", req_resp_pool.nready);
+    if( (req_resp_pool.nready = select(req_resp_pool.maxfd + 1, 
+      &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set, 
+      &req_resp_pool.ready_except_set, &timeout)) <= 0) {
+      // wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂�
+      break;
+    }
+// printf("req_resp_pool.nready =%d\n", req_resp_pool.nready);
     for (i = 0; (i <= req_resp_pool.maxi) && (req_resp_pool.nready > 0); i++) {
       if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
         /* If the descriptor is ready, echo a text line from it */
@@ -167,17 +171,22 @@
         if ( FD_ISSET(connfd, &req_resp_pool.ready_read_set))
         {
           req_resp_pool.nready--;
-          if(read_response(connfd, ret_arr+n_recv_suc) == 0) {
+          if( (n = read_response(connfd, ret_arr+n_recv_suc)) == 0) {
+            
+            // 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅
+            FD_CLR(connfd, &req_resp_pool.read_set);
+            req_resp_pool.connfd[i] = -1;  
             n_recv_suc++;
-          } else {
+          } else if(n == -1)  {
             close_connect(connfd);
           }
           
         }
+
         if (FD_ISSET(connfd, &req_resp_pool.ready_write_set))
         {
           req_resp_pool.nready--;
-printf("write %d\n", connfd);
+// printf("write %d\n", connfd);
           if(write_request(connfd, node->key, send_buf, send_size) != 0) {
             close_connect(connfd);
           } else{
@@ -195,6 +204,15 @@
     }
   }
 
+  FD_ZERO(&req_resp_pool.except_set);
+  for (i = 0; i <= req_resp_pool.maxi; i++) {
+    if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
+      // 鍏抽棴骞舵竻闄ゅ啓鍏ユ垨璇诲彇澶辫触鐨勮繛鎺�
+      close_connect(connfd);
+    }
+  }
+  req_resp_pool.maxi = -1;
+
   *recv_arr = ret_arr;
   if(recv_arr_size != NULL) {
     *recv_arr_size = n_recv_suc;
@@ -205,21 +223,25 @@
 
 int NetModSocket::write_request(int clientfd, int key, void *send_buf, int send_size) {
   net_mod_request_head_t request_head = {};
-  static char *buf;
-  static int buf_size, max_buf_size;
- 
-
+  int buf_size;
+  char *buf;
+  int  max_buf_size;
+  buf = (char *)malloc(MAXBUF);
   if(buf == NULL) {
-    buf = (char *)malloc(MAXBUF);
+    LoggerFactory::getLogger()->error(errno, "NetModSocket::NetModSocket malloc");
+    exit(1);
+  } else {
     max_buf_size = MAXBUF;
-    LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv malloc");
-   
   }
 
   buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
   if(max_buf_size < buf_size) {
     buf = (char *)realloc(buf, buf_size);
     max_buf_size = buf_size;
+    if(buf == NULL) {
+      LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request  realloc");
+      exit(1);
+    }
   }
 
   request_head.mod = REQ_REP;
@@ -234,13 +256,18 @@
 
 
   if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
-    LoggerFactory::getLogger()->error(errno, "NetModSocket::send conent rio_writen");
-    
+    LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request  rio_writen");
+    free(buf);
     return -1;
   }
+  free(buf);
   return 0;
 }
 
+/**
+ * @return 0 鎴愬姛, 1 瀵规柟娌℃湁瀵瑰簲鐨刱ey, -1 缃戠粶閿欒
+ *
+ */
 int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg) {
   int recv_size;
   void *recv_buf;
@@ -255,10 +282,14 @@
   }
 
   response_head =  NetModSocket::decode_response_head(response_head_bs);
+  if(response_head.code != 0) {
+    // 瀵规柟娌℃湁瀵瑰簲鐨刱ey
+    return 1;
+  }
 
   recv_buf = malloc(response_head.content_length);
   if(recv_buf == NULL) {
-    LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc");
+    LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc recv_buf");
     exit(1);
   }
   if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
@@ -351,6 +382,9 @@
     }
 
     response_head =  NetModSocket::decode_response_head(response_head_bs);
+    if(response_head.code != 0) {
+      continue;
+    }
 
     recv_buf = malloc(response_head.content_length);
     if(recv_buf == NULL) {
@@ -452,7 +486,7 @@
   free(buf);
   return nsuc;
 }
- 
+
 void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
 
   for(int i =0; i< size; i++) {
@@ -488,13 +522,15 @@
 
 void * NetModSocket::encode_response_head(net_mod_response_head_t & response) {
   char * head = (char *)malloc(NET_MODE_RESPONSE_HEAD_LENGTH);
-  PUT(head, htonl(response.content_length));
+  PUT(head, htonl(response.code));
+  PUT(head + 4, htonl(response.content_length));
   return head;
 }
 
 net_mod_response_head_t  NetModSocket::decode_response_head(void *_headbs) {
   char *headbs = (char *)_headbs;
   net_mod_response_head_t head;
-  head.content_length = ntohl(GET(headbs));
+  head.code = ntohl(GET(headbs));
+  head.content_length = ntohl(GET(headbs + 4));
   return head;
 }

--
Gitblit v1.8.0