From 95349b79a5a646736c706fe19645181146ee9486 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 20 十月 2020 16:29:59 +0800
Subject: [PATCH] update

---
 src/socket/net_mod_socket.c |  318 +++++++++++++++++++++++++++++++++-------------------
 1 files changed, 201 insertions(+), 117 deletions(-)

diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c
index 560f8be..df4e4a2 100644
--- a/src/socket/net_mod_socket.c
+++ b/src/socket/net_mod_socket.c
@@ -11,7 +11,7 @@
 {
 		init_req_rep_req_resp_pool();
    
-  
+    if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)    err_msg(errno, "signal");
 }
 
 
@@ -31,13 +31,11 @@
   /* Initially, there are no connected descriptors */
   int i;
   req_resp_pool.maxi = -1;                   //line:conc:echoservers:beginempty
-  for (i = 0; i < FD_SETSIZE; i++)
-    req_resp_pool.connfd[i] = -1;        //line:conc:echoservers:endempty
+  for (i = 0; i < OPEN_MAX; i++) {
+    req_resp_pool.conns[i].fd = -1; 
+    req_resp_pool.conns[i].events = 0;      
+  }
 
-  /* Initially, listenfd is only member of select read set */
-  FD_ZERO(&req_resp_pool.read_set);
-  FD_ZERO(&req_resp_pool.write_set);
-  FD_ZERO(&req_resp_pool.except_set);
 }
 
 int NetModSocket::connect( net_node_t *node) {
@@ -48,68 +46,75 @@
   char portstr[32];
 
   sprintf(mapKey, "%s:%d", node->host, node->port);
-  if( ( mapIter =  req_resp_pool.connectionMap.find(mapKey)) != req_resp_pool.connectionMap.end()) {
+  mapIter =  req_resp_pool.connectionMap.find(mapKey);
+  if( mapIter != req_resp_pool.connectionMap.end()) {
     connfd = mapIter->second;
-
+// printf("hit: %s\n", mapKey);
   } else {
-     
+// printf("mis: %s\n", mapKey);     
     sprintf(portstr, "%d", node->port);
+// printf("open before: %s\n", mapKey); 
     connfd = open_clientfd(node->host, portstr);
+// printf("open after: %s\n", mapKey); 
     if(connfd < 0) {
+      LoggerFactory::getLogger()->error(errno, "connect %s:%d ", node->host, node->port);
       return -1;
     }
     req_resp_pool.connectionMap.insert({mapKey, connfd});
   }
 
   
-  for (i = 0; i < FD_SETSIZE; i++) { /* Find an available slot */
-    if (req_resp_pool.connfd[i] < 0)
+  for (i = 0; i < OPEN_MAX; i++) { /* Find an available slot */
+    if (req_resp_pool.conns[i].fd < 0)
     {
       /* Add connected descriptor to the req_resp_pool */
-      req_resp_pool.connfd[i] = connfd;  
-      req_resp_pool.connfdNodeMap.insert({connfd, node});           
-     // Rio_readinitb(&req_resp_pool.clientrio[i], connfd); //line:conc:echoservers:endaddclient
-
+      req_resp_pool.conns[i].fd = connfd;  
+               
+      req_resp_pool.conns[i].events = POLLIN;
       /* Add the descriptor to descriptor set */
-      FD_SET(connfd, &req_resp_pool.read_set); //line:conc:echoservers:addconnfd
-      FD_SET(connfd, &req_resp_pool.write_set);
-      FD_SET(connfd, &req_resp_pool.except_set);
-      /* Update max descriptor and req_resp_pool highwater mark */
-      if (connfd > req_resp_pool.maxfd)
-        req_resp_pool.maxfd = connfd; 
-      if (i > req_resp_pool.maxi)      
-        req_resp_pool.maxi = i;      
       break;
     }
   }
 
-  if (i == FD_SETSIZE) {
+  if (i > req_resp_pool.maxi)      
+      req_resp_pool.maxi = i;   
+
+  if (i == OPEN_MAX) {
     /* Couldn't find an empty slot */
     LoggerFactory::getLogger()->error(errno, "add_client error: Too many clients");
     return -1;
-    
   }
+
   
   return connfd;
 }
 
 void NetModSocket::close_connect(int connfd) {
-
-  net_node_t *node =  req_resp_pool.connfdNodeMap.find(connfd)->second;
-
-  // std::map<std::string, int>::iterator mapIter;
-  if(close(connfd) != 0) {
-    LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect");
-  }
-  FD_CLR(connfd, &req_resp_pool.read_set);
-  FD_CLR(connfd, &req_resp_pool.write_set);
-  FD_CLR(connfd, &req_resp_pool.except_set);
-
+  int i;
   char mapKey[256];
-  // char portstr[32];
-  sprintf(mapKey, "%s:%d", node->host, node->port);
-  req_resp_pool.connectionMap.erase(mapKey);
-// LoggerFactory::getLogger()->debug("close_connect");
+  std::map<std::string, int>::iterator map_iter;
+  if(close(connfd) != 0) {
+    LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect close");
+  }
+ 
+
+  for (i = 0; i <= req_resp_pool.maxi; i++) {
+    if(req_resp_pool.conns[i].fd == connfd) {
+      req_resp_pool.conns[i].fd = -1;
+    }
+  }
+
+  for ( map_iter = req_resp_pool.connectionMap.begin(); map_iter != req_resp_pool.connectionMap.end(); ) {
+    if(connfd == map_iter->second) {
+// std::cout << "map_iter->first==" << map_iter->first << std::endl;
+     map_iter = req_resp_pool.connectionMap.erase(map_iter);
+    } else {
+      ++map_iter;
+    }
+  }
+
+  LoggerFactory::getLogger()->debug( "closed %d\n", connfd);
+
 }
 
 
@@ -119,9 +124,9 @@
   int i, n, recv_size,  connfd;
   net_node_t *node;
   void *recv_buf;
-  struct timeval timeout = {5, 0};
+  int  timeout = 5 * 1000;
  
-  int n_conn_suc = 0, n_recv_suc = 0;
+  int n_req = 0, n_recv_suc = 0, n_resp;
    
   net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
   
@@ -146,71 +151,78 @@
       continue;
     }
 
-    n_conn_suc++;
+// printf("write_request %s:%d\n", node->host, node->port);
+    if(write_request(connfd, node->key, send_buf, send_size) != 0) {
+      LoggerFactory::getLogger()->error("write_request failture %s:%d\n", node->host, node->port);
+      close_connect(connfd);
+      // req_resp_pool.conns[i].fd = -1;
+    } else {
+      n_req++;
+    }
+  
   }
 
-// printf("n_conn_suc =%d\n", n_conn_suc);
+// printf(" req_resp_pool.maxi = %d\n",  req_resp_pool.maxi);
+// printf(" n_req = %d\n", n_req);
 
-  while(n_recv_suc < n_conn_suc)
+// int tmp = 0;
+  while(n_resp < n_req)
   {
+// printf(" while %d\n", tmp++);
     /* Wait for listening/connected descriptor(s) to become ready */
-    req_resp_pool.ready_read_set = req_resp_pool.read_set;
-    req_resp_pool.ready_write_set = req_resp_pool.write_set;
-    req_resp_pool.ready_except_set = req_resp_pool.except_set;
-    if( (req_resp_pool.nready = select(req_resp_pool.maxfd + 1, 
-      &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set, 
-      &req_resp_pool.ready_except_set, &timeout)) <= 0) {
-      // wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂�
+    if( (req_resp_pool.nready = poll(req_resp_pool.conns, req_resp_pool.maxi + 1, timeout) ) <= 0) {
+       // wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂�
       break;
     }
 // printf("req_resp_pool.nready =%d\n", req_resp_pool.nready);
     for (i = 0; (i <= req_resp_pool.maxi) && (req_resp_pool.nready > 0); i++) {
-      if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
+      if ( (connfd = req_resp_pool.conns[i].fd) > 0 ) {
         /* If the descriptor is ready, echo a text line from it */
-        node =  req_resp_pool.connfdNodeMap.find(connfd)->second;
-        if ( FD_ISSET(connfd, &req_resp_pool.ready_read_set))
+        if (req_resp_pool.conns[i].revents & POLLIN )
         {
           req_resp_pool.nready--;
+// printf("POLLIN %d\n", connfd);
           if( (n = read_response(connfd, ret_arr+n_recv_suc)) == 0) {
             
             // 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅
-            FD_CLR(connfd, &req_resp_pool.read_set);
-            req_resp_pool.connfd[i] = -1;  
+            req_resp_pool.conns[i].fd = -1;
             n_recv_suc++;
+            
           } else if(n == -1)  {
+            req_resp_pool.conns[i].fd = -1;
             close_connect(connfd);
+          } else {
+            req_resp_pool.conns[i].fd = -1;
+             
           }
+          n_resp++;
+// printf("read response %d\n", n);
           
         }
 
-        if (FD_ISSET(connfd, &req_resp_pool.ready_write_set))
-        {
-          req_resp_pool.nready--;
-// printf("write %d\n", connfd);
-          if(write_request(connfd, node->key, send_buf, send_size) != 0) {
-            close_connect(connfd);
-          } else{
-            // 涓�娆″啓鍏ュ畬鎴愬悗娓呯┖鍐欏叆浣�
-            FD_CLR(connfd, &req_resp_pool.write_set);
-          }
-         
+        if (req_resp_pool.conns[i].revents & POLLOUT ) {
+  // printf("poll POLLOUT %d\n", connfd);        
         }
-        if (FD_ISSET(connfd, &req_resp_pool.ready_except_set))
+
+        if (req_resp_pool.conns[i].revents & (POLLRDHUP | POLLHUP | POLLERR) )
         {
+// printf("poll POLLERR %d\n", connfd);
           req_resp_pool.nready--;
           close_connect(connfd);
+          req_resp_pool.conns[i].fd = -1;
         }
       }
     }
   }
 
-  FD_ZERO(&req_resp_pool.except_set);
   for (i = 0; i <= req_resp_pool.maxi; i++) {
-    if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
+    if ( (connfd = req_resp_pool.conns[i].fd) > 0 ) {
       // 鍏抽棴骞舵竻闄ゅ啓鍏ユ垨璇诲彇澶辫触鐨勮繛鎺�
       close_connect(connfd);
+      req_resp_pool.conns[i].fd = -1;
     }
   }
+
   req_resp_pool.maxi = -1;
 
   *recv_arr = ret_arr;
@@ -226,8 +238,7 @@
   int buf_size;
   char *buf;
   int  max_buf_size;
-  buf = (char *)malloc(MAXBUF);
-  if(buf == NULL) {
+  if((buf = (char *)malloc(MAXBUF)) == NULL) {
     LoggerFactory::getLogger()->error(errno, "NetModSocket::NetModSocket malloc");
     exit(1);
   } else {
@@ -236,11 +247,12 @@
 
   buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
   if(max_buf_size < buf_size) {
-    buf = (char *)realloc(buf, buf_size);
-    max_buf_size = buf_size;
-    if(buf == NULL) {
-      LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request  realloc");
+    
+    if((buf = (char *)realloc(buf, buf_size)) == NULL) {
+      LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request  realloc buf");
       exit(1);
+    } else {
+      max_buf_size = buf_size;
     }
   }
 
@@ -274,9 +286,8 @@
   char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
  
   net_mod_response_head_t response_head;
-  net_node_t *node =  req_resp_pool.connfdNodeMap.find(connfd)->second;
   if ( rio_readn(connfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) !=  NET_MODE_RESPONSE_HEAD_LENGTH) {
-    LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
+    LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response  rio_readnb response_head");
     
     return -1;
   }
@@ -293,15 +304,14 @@
     exit(1);
   }
   if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
-    LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
+    LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response  rio_readnb recv_buf");
     
     return -1;
   }
 
-  
-  strcpy( recv_msg->host, node->host);
-  recv_msg->port = node->port;
-  recv_msg->key = node->key;
+  strcpy( recv_msg->host, response_head.host);
+  recv_msg->port = response_head.port;
+  recv_msg->key = response_head.key;
   recv_msg->content = recv_buf;
   recv_msg->content_length = recv_size;
   return 0;
@@ -349,8 +359,12 @@
 
     buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
     if(max_buf_size < buf_size) {
-      buf = (char *)realloc(buf, buf_size);
-      max_buf_size = buf_size;
+      if((buf = (char *)realloc(buf, buf_size)) == NULL) {
+        LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe realloc buf");
+      } else {
+        max_buf_size = buf_size;
+      }
+      
     }
     
 
@@ -366,7 +380,7 @@
 
 
     if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
-      LoggerFactory::getLogger()->error(errno, "NetModSocket::send conent rio_writen");
+      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_writen  buf");
      
       close(clientfd);
       continue;
@@ -375,7 +389,7 @@
     // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
 
     if ( rio_readn(clientfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) !=  NET_MODE_RESPONSE_HEAD_LENGTH) {
-      LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
+      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_readnb response_head_bs");
      
       close(clientfd);
       continue;
@@ -392,7 +406,7 @@
       exit(1);
     }
     if ( (recv_size = rio_readn(clientfd, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
-      LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
+      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_readnb recv_buf");
       
       close(clientfd);
       continue;
@@ -465,8 +479,12 @@
       buf_size = NET_MODE_REQUEST_HEAD_LENGTH + content_size + request_head.topic_length;
 
       if(max_buf_size < buf_size) {
-        buf = (char *)realloc(buf, buf_size);
-        max_buf_size = buf_size;
+        if( ( buf = (char *)realloc(buf, buf_size)) == NULL) {
+           LoggerFactory::getLogger()->error(errno, "NetModSocket::pub realloc buf ");
+        } else {
+          max_buf_size = buf_size;
+        }
+        
       }
 
       memcpy(buf, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH);
@@ -495,42 +513,108 @@
   free(arr);
 }
 
-// ssize_t recv(void *buf, size_t len) {
-
-// 	return rio_readlineb(&rio, buf, MAXLINE);
-  
-// }
+/**
+  uint32_t mod;
+  char host[NI_MAXHOST];
+  uint32_t port;
+  uint32_t key;
+  uint32_t content_length;
+  uint32_t topic_length;
+*/
 
 void * NetModSocket::encode_request_head(net_mod_request_head_t & request) {
-  char * head = (char *)malloc(NET_MODE_REQUEST_HEAD_LENGTH);
-  PUT(head, htonl(request.mod));
-  PUT(head + 4, htonl(request.key));
-  PUT(head + 8, htonl(request.content_length));
-  PUT(head + 12, htonl(request.topic_length));
-  return head;
+  void * headbs = malloc(NET_MODE_REQUEST_HEAD_LENGTH);
+  char *tmp_ptr = (char *)headbs;
+  PUT(tmp_ptr, htonl(request.mod));
+
+  tmp_ptr += 4;
+  memcpy(tmp_ptr, request.host, NI_MAXHOST);
+
+  tmp_ptr += NI_MAXHOST;
+  PUT(tmp_ptr, htonl(request.port));
+
+  tmp_ptr += 4;
+  PUT(tmp_ptr, htonl(request.key));
+
+  tmp_ptr += 4;
+  PUT(tmp_ptr, htonl(request.content_length));
+
+  tmp_ptr += 4;
+  PUT(tmp_ptr, htonl(request.topic_length));
+  
+  
+  return headbs;
 }
 
-net_mod_request_head_t  NetModSocket::decode_request_head(void *_headbs) {
-  char *headbs = (char *)_headbs;
+net_mod_request_head_t  NetModSocket::decode_request_head(void *headbs) {
+  char *tmp_ptr = (char *)headbs;
   net_mod_request_head_t head;
-  head.mod = ntohl(GET(headbs));
-  head.key = ntohl(GET(headbs + 4));
-  head.content_length = ntohl(GET(headbs + 8));
-  head.topic_length = ntohl(GET(headbs + 12));
+
+  head.mod = ntohl(GET(tmp_ptr));
+
+  tmp_ptr += NI_MAXHOST;
+  memcpy(head.host, tmp_ptr, NI_MAXHOST);
+
+  tmp_ptr += 4;
+  head.port = ntohl(GET(tmp_ptr));
+
+  tmp_ptr += 4;
+  head.key = ntohl(GET(tmp_ptr));
+
+  tmp_ptr += 4;
+  head.content_length = ntohl(GET(tmp_ptr));
+
+  tmp_ptr += 4;
+  head.topic_length = ntohl(GET(tmp_ptr));
+ 
   return head;
 }
 
+/**
+ char host[NI_MAXHOST];
+  uint32_t port;
+  uint32_t key;
+  uint32_t content_length;
+  uint32_t code;
+  */
 void * NetModSocket::encode_response_head(net_mod_response_head_t & response) {
-  char * head = (char *)malloc(NET_MODE_RESPONSE_HEAD_LENGTH);
-  PUT(head, htonl(response.code));
-  PUT(head + 4, htonl(response.content_length));
-  return head;
+  void * headbs = malloc(NET_MODE_RESPONSE_HEAD_LENGTH);
+  char *tmp_ptr = (char *)headbs;
+
+  memcpy(tmp_ptr, response.host, NI_MAXHOST);
+
+  tmp_ptr += NI_MAXHOST;
+  PUT(tmp_ptr, htonl(response.port));
+
+  tmp_ptr += 4;
+  PUT(tmp_ptr , htonl(response.key));
+
+  tmp_ptr += 4;
+  PUT(tmp_ptr, htonl(response.content_length));
+
+  tmp_ptr += 4;
+  PUT(tmp_ptr, htonl(response.code));
+
+  return headbs;
 }
 
-net_mod_response_head_t  NetModSocket::decode_response_head(void *_headbs) {
-  char *headbs = (char *)_headbs;
+net_mod_response_head_t  NetModSocket::decode_response_head(void *headbs) {
+  char *tmp_ptr = (char *)headbs;
   net_mod_response_head_t head;
-  head.code = ntohl(GET(headbs));
-  head.content_length = ntohl(GET(headbs + 4));
+
+  memcpy(head.host, tmp_ptr, NI_MAXHOST);
+
+  tmp_ptr += NI_MAXHOST;
+  head.port = ntohl(GET(tmp_ptr));
+
+  tmp_ptr += 4;
+  head.key = ntohl(GET(tmp_ptr));
+
+  tmp_ptr += 4;
+  head.content_length = ntohl(GET(tmp_ptr));
+
+  tmp_ptr += 4;
+  head.code = ntohl(GET(tmp_ptr));
+
   return head;
 }

--
Gitblit v1.8.0