wangzhengquan
2020-10-20 95349b79a5a646736c706fe19645181146ee9486
src/socket/net_mod_socket.c
@@ -11,7 +11,7 @@
{
      init_req_rep_req_resp_pool();
   
    if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)    err_msg(errno, "signal");
}
@@ -31,13 +31,11 @@
  /* Initially, there are no connected descriptors */
  int i;
  req_resp_pool.maxi = -1;                   //line:conc:echoservers:beginempty
  for (i = 0; i < FD_SETSIZE; i++)
    req_resp_pool.connfd[i] = -1;        //line:conc:echoservers:endempty
  for (i = 0; i < OPEN_MAX; i++) {
    req_resp_pool.conns[i].fd = -1;
    req_resp_pool.conns[i].events = 0;
  }
  /* Initially, listenfd is only member of select read set */
  FD_ZERO(&req_resp_pool.read_set);
  FD_ZERO(&req_resp_pool.write_set);
  FD_ZERO(&req_resp_pool.except_set);
}
int NetModSocket::connect( net_node_t *node) {
@@ -48,68 +46,75 @@
  char portstr[32];
  sprintf(mapKey, "%s:%d", node->host, node->port);
  if( ( mapIter =  req_resp_pool.connectionMap.find(mapKey)) != req_resp_pool.connectionMap.end()) {
  mapIter =  req_resp_pool.connectionMap.find(mapKey);
  if( mapIter != req_resp_pool.connectionMap.end()) {
    connfd = mapIter->second;
// printf("hit: %s\n", mapKey);
  } else {
// printf("mis: %s\n", mapKey);
    sprintf(portstr, "%d", node->port);
// printf("open before: %s\n", mapKey);
    connfd = open_clientfd(node->host, portstr);
// printf("open after: %s\n", mapKey);
    if(connfd < 0) {
      LoggerFactory::getLogger()->error(errno, "connect %s:%d ", node->host, node->port);
      return -1;
    }
    req_resp_pool.connectionMap.insert({mapKey, connfd});
  }
  
  for (i = 0; i < FD_SETSIZE; i++) { /* Find an available slot */
    if (req_resp_pool.connfd[i] < 0)
  for (i = 0; i < OPEN_MAX; i++) { /* Find an available slot */
    if (req_resp_pool.conns[i].fd < 0)
    {
      /* Add connected descriptor to the req_resp_pool */
      req_resp_pool.connfd[i] = connfd;
      req_resp_pool.connfdNodeMap.insert({connfd, node});
     // Rio_readinitb(&req_resp_pool.clientrio[i], connfd); //line:conc:echoservers:endaddclient
      req_resp_pool.conns[i].fd = connfd;
      req_resp_pool.conns[i].events = POLLIN;
      /* Add the descriptor to descriptor set */
      FD_SET(connfd, &req_resp_pool.read_set); //line:conc:echoservers:addconnfd
      FD_SET(connfd, &req_resp_pool.write_set);
      FD_SET(connfd, &req_resp_pool.except_set);
      /* Update max descriptor and req_resp_pool highwater mark */
      if (connfd > req_resp_pool.maxfd)
        req_resp_pool.maxfd = connfd;
      if (i > req_resp_pool.maxi)
        req_resp_pool.maxi = i;
      break;
    }
  }
  if (i == FD_SETSIZE) {
  if (i > req_resp_pool.maxi)
      req_resp_pool.maxi = i;
  if (i == OPEN_MAX) {
    /* Couldn't find an empty slot */
    LoggerFactory::getLogger()->error(errno, "add_client error: Too many clients");
    return -1;
  }
  
  return connfd;
}
void NetModSocket::close_connect(int connfd) {
  net_node_t *node =  req_resp_pool.connfdNodeMap.find(connfd)->second;
  // std::map<std::string, int>::iterator mapIter;
  if(close(connfd) != 0) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect");
  }
  FD_CLR(connfd, &req_resp_pool.read_set);
  FD_CLR(connfd, &req_resp_pool.write_set);
  FD_CLR(connfd, &req_resp_pool.except_set);
  int i;
  char mapKey[256];
  // char portstr[32];
  sprintf(mapKey, "%s:%d", node->host, node->port);
  req_resp_pool.connectionMap.erase(mapKey);
// LoggerFactory::getLogger()->debug("close_connect");
  std::map<std::string, int>::iterator map_iter;
  if(close(connfd) != 0) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect close");
  }
  for (i = 0; i <= req_resp_pool.maxi; i++) {
    if(req_resp_pool.conns[i].fd == connfd) {
      req_resp_pool.conns[i].fd = -1;
    }
  }
  for ( map_iter = req_resp_pool.connectionMap.begin(); map_iter != req_resp_pool.connectionMap.end(); ) {
    if(connfd == map_iter->second) {
// std::cout << "map_iter->first==" << map_iter->first << std::endl;
     map_iter = req_resp_pool.connectionMap.erase(map_iter);
    } else {
      ++map_iter;
    }
  }
  LoggerFactory::getLogger()->debug( "closed %d\n", connfd);
}
@@ -119,9 +124,9 @@
  int i, n, recv_size,  connfd;
  net_node_t *node;
  void *recv_buf;
  struct timeval timeout = {5, 0};
  int  timeout = 5 * 1000;
 
  int n_conn_suc = 0, n_recv_suc = 0;
  int n_req = 0, n_recv_suc = 0, n_resp;
   
  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
  
@@ -146,71 +151,78 @@
      continue;
    }
    n_conn_suc++;
// printf("write_request %s:%d\n", node->host, node->port);
    if(write_request(connfd, node->key, send_buf, send_size) != 0) {
      LoggerFactory::getLogger()->error("write_request failture %s:%d\n", node->host, node->port);
      close_connect(connfd);
      // req_resp_pool.conns[i].fd = -1;
    } else {
      n_req++;
    }
  }
// printf("n_conn_suc =%d\n", n_conn_suc);
// printf(" req_resp_pool.maxi = %d\n",  req_resp_pool.maxi);
// printf(" n_req = %d\n", n_req);
  while(n_recv_suc < n_conn_suc)
// int tmp = 0;
  while(n_resp < n_req)
  {
// printf(" while %d\n", tmp++);
    /* Wait for listening/connected descriptor(s) to become ready */
    req_resp_pool.ready_read_set = req_resp_pool.read_set;
    req_resp_pool.ready_write_set = req_resp_pool.write_set;
    req_resp_pool.ready_except_set = req_resp_pool.except_set;
    if( (req_resp_pool.nready = select(req_resp_pool.maxfd + 1,
      &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set,
      &req_resp_pool.ready_except_set, &timeout)) <= 0) {
      // wirite_set 和 read_set 在指定时间内都没准备好
    if( (req_resp_pool.nready = poll(req_resp_pool.conns, req_resp_pool.maxi + 1, timeout) ) <= 0) {
       // wirite_set 和 read_set 在指定时间内都没准备好
      break;
    }
// printf("req_resp_pool.nready =%d\n", req_resp_pool.nready);
    for (i = 0; (i <= req_resp_pool.maxi) && (req_resp_pool.nready > 0); i++) {
      if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
      if ( (connfd = req_resp_pool.conns[i].fd) > 0 ) {
        /* If the descriptor is ready, echo a text line from it */
        node =  req_resp_pool.connfdNodeMap.find(connfd)->second;
        if ( FD_ISSET(connfd, &req_resp_pool.ready_read_set))
        if (req_resp_pool.conns[i].revents & POLLIN )
        {
          req_resp_pool.nready--;
// printf("POLLIN %d\n", connfd);
          if( (n = read_response(connfd, ret_arr+n_recv_suc)) == 0) {
            
            // 成功收到返回消息,清空读入位
            FD_CLR(connfd, &req_resp_pool.read_set);
            req_resp_pool.connfd[i] = -1;
            req_resp_pool.conns[i].fd = -1;
            n_recv_suc++;
          } else if(n == -1)  {
            req_resp_pool.conns[i].fd = -1;
            close_connect(connfd);
          } else {
            req_resp_pool.conns[i].fd = -1;
          }
          n_resp++;
// printf("read response %d\n", n);
          
        }
        if (FD_ISSET(connfd, &req_resp_pool.ready_write_set))
        {
          req_resp_pool.nready--;
// printf("write %d\n", connfd);
          if(write_request(connfd, node->key, send_buf, send_size) != 0) {
            close_connect(connfd);
          } else{
            // 一次写入完成后清空写入位
            FD_CLR(connfd, &req_resp_pool.write_set);
          }
        if (req_resp_pool.conns[i].revents & POLLOUT ) {
  // printf("poll POLLOUT %d\n", connfd);
        }
        if (FD_ISSET(connfd, &req_resp_pool.ready_except_set))
        if (req_resp_pool.conns[i].revents & (POLLRDHUP | POLLHUP | POLLERR) )
        {
// printf("poll POLLERR %d\n", connfd);
          req_resp_pool.nready--;
          close_connect(connfd);
          req_resp_pool.conns[i].fd = -1;
        }
      }
    }
  }
  FD_ZERO(&req_resp_pool.except_set);
  for (i = 0; i <= req_resp_pool.maxi; i++) {
    if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
    if ( (connfd = req_resp_pool.conns[i].fd) > 0 ) {
      // 关闭并清除写入或读取失败的连接
      close_connect(connfd);
      req_resp_pool.conns[i].fd = -1;
    }
  }
  req_resp_pool.maxi = -1;
  *recv_arr = ret_arr;
@@ -226,8 +238,7 @@
  int buf_size;
  char *buf;
  int  max_buf_size;
  buf = (char *)malloc(MAXBUF);
  if(buf == NULL) {
  if((buf = (char *)malloc(MAXBUF)) == NULL) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::NetModSocket malloc");
    exit(1);
  } else {
@@ -236,11 +247,12 @@
  buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
  if(max_buf_size < buf_size) {
    buf = (char *)realloc(buf, buf_size);
    max_buf_size = buf_size;
    if(buf == NULL) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request  realloc");
    if((buf = (char *)realloc(buf, buf_size)) == NULL) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request  realloc buf");
      exit(1);
    } else {
      max_buf_size = buf_size;
    }
  }
@@ -274,9 +286,8 @@
  char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
 
  net_mod_response_head_t response_head;
  net_node_t *node =  req_resp_pool.connfdNodeMap.find(connfd)->second;
  if ( rio_readn(connfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) !=  NET_MODE_RESPONSE_HEAD_LENGTH) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
    LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response  rio_readnb response_head");
    
    return -1;
  }
@@ -293,15 +304,14 @@
    exit(1);
  }
  if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
    LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response  rio_readnb recv_buf");
    
    return -1;
  }
  strcpy( recv_msg->host, node->host);
  recv_msg->port = node->port;
  recv_msg->key = node->key;
  strcpy( recv_msg->host, response_head.host);
  recv_msg->port = response_head.port;
  recv_msg->key = response_head.key;
  recv_msg->content = recv_buf;
  recv_msg->content_length = recv_size;
  return 0;
@@ -349,8 +359,12 @@
    buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
    if(max_buf_size < buf_size) {
      buf = (char *)realloc(buf, buf_size);
      max_buf_size = buf_size;
      if((buf = (char *)realloc(buf, buf_size)) == NULL) {
        LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe realloc buf");
      } else {
        max_buf_size = buf_size;
      }
    }
    
@@ -366,7 +380,7 @@
    if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::send conent rio_writen");
      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_writen  buf");
     
      close(clientfd);
      continue;
@@ -375,7 +389,7 @@
    // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
    if ( rio_readn(clientfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) !=  NET_MODE_RESPONSE_HEAD_LENGTH) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_readnb response_head_bs");
     
      close(clientfd);
      continue;
@@ -392,7 +406,7 @@
      exit(1);
    }
    if ( (recv_size = rio_readn(clientfd, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_readnb recv_buf");
      
      close(clientfd);
      continue;
@@ -465,8 +479,12 @@
      buf_size = NET_MODE_REQUEST_HEAD_LENGTH + content_size + request_head.topic_length;
      if(max_buf_size < buf_size) {
        buf = (char *)realloc(buf, buf_size);
        max_buf_size = buf_size;
        if( ( buf = (char *)realloc(buf, buf_size)) == NULL) {
           LoggerFactory::getLogger()->error(errno, "NetModSocket::pub realloc buf ");
        } else {
          max_buf_size = buf_size;
        }
      }
      memcpy(buf, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH);
@@ -495,42 +513,108 @@
  free(arr);
}
// ssize_t recv(void *buf, size_t len) {
//    return rio_readlineb(&rio, buf, MAXLINE);
// }
/**
  uint32_t mod;
  char host[NI_MAXHOST];
  uint32_t port;
  uint32_t key;
  uint32_t content_length;
  uint32_t topic_length;
*/
void * NetModSocket::encode_request_head(net_mod_request_head_t & request) {
  char * head = (char *)malloc(NET_MODE_REQUEST_HEAD_LENGTH);
  PUT(head, htonl(request.mod));
  PUT(head + 4, htonl(request.key));
  PUT(head + 8, htonl(request.content_length));
  PUT(head + 12, htonl(request.topic_length));
  return head;
  void * headbs = malloc(NET_MODE_REQUEST_HEAD_LENGTH);
  char *tmp_ptr = (char *)headbs;
  PUT(tmp_ptr, htonl(request.mod));
  tmp_ptr += 4;
  memcpy(tmp_ptr, request.host, NI_MAXHOST);
  tmp_ptr += NI_MAXHOST;
  PUT(tmp_ptr, htonl(request.port));
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(request.key));
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(request.content_length));
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(request.topic_length));
  return headbs;
}
net_mod_request_head_t  NetModSocket::decode_request_head(void *_headbs) {
  char *headbs = (char *)_headbs;
net_mod_request_head_t  NetModSocket::decode_request_head(void *headbs) {
  char *tmp_ptr = (char *)headbs;
  net_mod_request_head_t head;
  head.mod = ntohl(GET(headbs));
  head.key = ntohl(GET(headbs + 4));
  head.content_length = ntohl(GET(headbs + 8));
  head.topic_length = ntohl(GET(headbs + 12));
  head.mod = ntohl(GET(tmp_ptr));
  tmp_ptr += NI_MAXHOST;
  memcpy(head.host, tmp_ptr, NI_MAXHOST);
  tmp_ptr += 4;
  head.port = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.key = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.content_length = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.topic_length = ntohl(GET(tmp_ptr));
  return head;
}
/**
 char host[NI_MAXHOST];
  uint32_t port;
  uint32_t key;
  uint32_t content_length;
  uint32_t code;
  */
void * NetModSocket::encode_response_head(net_mod_response_head_t & response) {
  char * head = (char *)malloc(NET_MODE_RESPONSE_HEAD_LENGTH);
  PUT(head, htonl(response.code));
  PUT(head + 4, htonl(response.content_length));
  return head;
  void * headbs = malloc(NET_MODE_RESPONSE_HEAD_LENGTH);
  char *tmp_ptr = (char *)headbs;
  memcpy(tmp_ptr, response.host, NI_MAXHOST);
  tmp_ptr += NI_MAXHOST;
  PUT(tmp_ptr, htonl(response.port));
  tmp_ptr += 4;
  PUT(tmp_ptr , htonl(response.key));
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(response.content_length));
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(response.code));
  return headbs;
}
net_mod_response_head_t  NetModSocket::decode_response_head(void *_headbs) {
  char *headbs = (char *)_headbs;
net_mod_response_head_t  NetModSocket::decode_response_head(void *headbs) {
  char *tmp_ptr = (char *)headbs;
  net_mod_response_head_t head;
  head.code = ntohl(GET(headbs));
  head.content_length = ntohl(GET(headbs + 4));
  memcpy(head.host, tmp_ptr, NI_MAXHOST);
  tmp_ptr += NI_MAXHOST;
  head.port = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.key = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.content_length = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.code = ntohl(GET(tmp_ptr));
  return head;
}