wangzhengquan
2020-10-20 95349b79a5a646736c706fe19645181146ee9486
update
2个文件已添加
1 文件已重命名
9个文件已修改
454 ■■■■■ 已修改文件
.gdbinit 补丁 | 查看 | 原始文档 | blame | 历史
include/usgcommon/logger.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
lib/libusgcommon.a 补丁 | 查看 | 原始文档 | blame | 历史
src/Makefile 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/common 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket.c 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.c 318 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.h 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/Makefile 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.c 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/Makefile 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gdbinit
include/usgcommon/logger.h
@@ -22,7 +22,7 @@
    struct timeval tv;
    struct tm *info;
    gettimeofday(&tv, NULL);
    info = localtime(&tv.tv_sec);
    info = localtime(&(tv.tv_sec));
    strftime(buf, MAXBUF - 1, "%Y-%d-%m %H:%M:%S", info);
    snprintf(buf + strlen(buf), MAXBUF - strlen(buf) - 1, ",%ld [%s] ",  tv.tv_usec, strlevel(level));
    vsnprintf(buf + strlen(buf), MAXBUF - strlen(buf) - 1, fmt, ap);
lib/libusgcommon.a
Binary files differ
src/Makefile
@@ -7,14 +7,16 @@
PREFIX = $(DEST)
LIBSQUEUE = libshm_queue.a
DLIBSQUEUE = libshm_queue.so
LIBSQUEUE = $(DEST)/lib/libshm_queue.a
DLIBSQUEUE = $(DEST)/lib/libshm_queue.so
# 开源工具包
LDLIBS += -lusgcommon
#LDLIBS += -lusgcommon
INCLUDES += -I./queue -I./socket  -I$(ROOT)/include/usgcommon
#-I$(ROOT)/include/usgcommon
INCLUDES += -I./queue -I./socket -I./common/include
SOURCES := $(wildcard *.c ./**/*.c)
OBJS   = $(patsubst %.c, $(DEST)/%.o, $(SOURCES)) 
@@ -38,15 +40,16 @@
.PHONY: build
build: prebuild $(MYLIBS)
    mkdir -p $(DEST)/lib
    cp $(MYLIBS) $(DEST)/lib
    mkdir -p $(DEST)/include/shmqueue
    cp  ./*.h ./queue/*.h ./socket/*.h  $(DEST)/include/shmqueue
    cp $(ROOT)/lib/* $(DEST)/lib
    cp $(ROOT)/.gdbinit $(DEST)
    # mkdir -p $(DEST)/lib
    # cp $(MYLIBS) $(DEST)/lib
    # mkdir -p $(DEST)/include/shmqueue
    # cp  ./*.h ./queue/*.h ./socket/*.h  $(DEST)/include/shmqueue
    # cp $(ROOT)/lib/* $(DEST)/lib
.PHONY: prebuild
prebuild:
    @test -d $(DEST) || mkdir $(DEST)
    @test -d $(DEST)/lib || mkdir -p $(DEST)/lib
#static lib
$(LIBSQUEUE): $(OBJS)
src/common
New file
@@ -0,0 +1 @@
/home/wzq/wk/Basic-Common/common
src/socket/net_mod_server_socket.c
@@ -43,6 +43,7 @@
    socklen_t clientlen;
  struct sockaddr_storage clientaddr;
  char portstr[32];
  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)    err_msg(errno, "signal");
  //shmModSocket = new ShmModSocket;
  sprintf(portstr, "%d", port);
@@ -58,7 +59,7 @@
    /* Wait for listening/connected descriptor(s) to become ready */
    pool.ready_set = pool.read_set;
    pool.nready = select(pool.maxfd + 1, &pool.ready_set, NULL, NULL, NULL);
 // LoggerFactory::getLogger()->debug("select return \n");
    /* If listening descriptor ready, add new client to pool */
    if (FD_ISSET(listenfd, &pool.ready_set))
    {
@@ -135,11 +136,12 @@
  request_head = NetModSocket::decode_request_head(request_head_bs);
  if(request_head.content_length > max_buf) {
    buf = realloc(buf, request_head.content_length);
    max_buf = request_head.content_length;
    if(buf == NULL) {
      LoggerFactory::getLogger()->error(errno, "process_client realloc");
    if( (buf = realloc(buf, request_head.content_length)) == NULL) {
      LoggerFactory::getLogger()->error(errno, "NetModServerSocket::process_client realloc buf");
      exit(1);
    } else {
      max_buf = request_head.content_length;
    }
  }  
@@ -148,7 +150,12 @@
  }
  if(request_head.mod == REQ_REP) {
// printf("server response===========\n");
    memcpy(response_head.host, request_head.host, NI_MAXHOST);
    response_head.port = request_head.port;
    response_head.key = request_head.key;
    if(shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size) != 0) {
      response_head.code = 1;
      response_head.content_length = 0;
      if( rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH )
@@ -160,8 +167,14 @@
      response_buf_size = NET_MODE_RESPONSE_HEAD_LENGTH + recv_size;
      if(max_response_buf < response_buf_size) {
        buf = (char *)realloc(response_buf, response_buf_size);
        max_response_buf = response_buf_size;
        if( (response_buf = (char *)realloc(response_buf, response_buf_size)) == NULL ) {
          LoggerFactory::getLogger()->error(errno, "NetModServerSocket::process_client realloc response_buf");
          exit(1);
        } else {
          max_response_buf = response_buf_size;
        }
      }
      memcpy(response_buf, NetModSocket::encode_response_head(response_head),  NET_MODE_RESPONSE_HEAD_LENGTH);
      memcpy(response_buf + NET_MODE_RESPONSE_HEAD_LENGTH, recv_buf,  recv_size);
@@ -176,11 +189,11 @@
   
  } else if(request_head.mod == BUS) {
    if(request_head.topic_length > max_topic_buf) {
      topic_buf = realloc(topic_buf, request_head.topic_length);
      max_topic_buf = request_head.topic_length;
      if(topic_buf == NULL) {
        LoggerFactory::getLogger()->error(errno, "process_client realloc");
      if( (topic_buf = realloc(topic_buf, request_head.topic_length)) == NULL ) {
         LoggerFactory::getLogger()->error(errno, "NetModServerSocket::process_client realloc topic_buf");
        exit(1);
      } else {
        max_topic_buf = request_head.topic_length;
      }
    }
@@ -200,7 +213,7 @@
{
  int i, connfd;
  //rio_t *rio;
  Logger * logger = LoggerFactory::getLogger();
  for (i = 0; (i <= pool.maxi) && (pool.nready > 0); i++)
  {
@@ -211,10 +224,13 @@
    {
      pool.nready--;
      if(process_client(connfd) != 0) {
        LoggerFactory::getLogger()->debug("===server close client %d\n", connfd);
        Close(connfd); //line:conc:echoservers:closeconnfd
        Close(connfd);
        FD_CLR(connfd, &pool.read_set); 
        pool.clientfd[i] = -1;
        logger->debug("===server close client %d\n", connfd);
    // printf("===server close client %d\n", connfd);
      }
    }
src/socket/net_mod_socket.c
@@ -11,7 +11,7 @@
{
        init_req_rep_req_resp_pool();
   
    if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)    err_msg(errno, "signal");
}
@@ -31,13 +31,11 @@
  /* Initially, there are no connected descriptors */
  int i;
  req_resp_pool.maxi = -1;                   //line:conc:echoservers:beginempty
  for (i = 0; i < FD_SETSIZE; i++)
    req_resp_pool.connfd[i] = -1;        //line:conc:echoservers:endempty
  for (i = 0; i < OPEN_MAX; i++) {
    req_resp_pool.conns[i].fd = -1;
    req_resp_pool.conns[i].events = 0;
  }
  /* Initially, listenfd is only member of select read set */
  FD_ZERO(&req_resp_pool.read_set);
  FD_ZERO(&req_resp_pool.write_set);
  FD_ZERO(&req_resp_pool.except_set);
}
int NetModSocket::connect( net_node_t *node) {
@@ -48,68 +46,75 @@
  char portstr[32];
  sprintf(mapKey, "%s:%d", node->host, node->port);
  if( ( mapIter =  req_resp_pool.connectionMap.find(mapKey)) != req_resp_pool.connectionMap.end()) {
  mapIter =  req_resp_pool.connectionMap.find(mapKey);
  if( mapIter != req_resp_pool.connectionMap.end()) {
    connfd = mapIter->second;
// printf("hit: %s\n", mapKey);
  } else {
// printf("mis: %s\n", mapKey);
    sprintf(portstr, "%d", node->port);
// printf("open before: %s\n", mapKey);
    connfd = open_clientfd(node->host, portstr);
// printf("open after: %s\n", mapKey);
    if(connfd < 0) {
      LoggerFactory::getLogger()->error(errno, "connect %s:%d ", node->host, node->port);
      return -1;
    }
    req_resp_pool.connectionMap.insert({mapKey, connfd});
  }
  
  for (i = 0; i < FD_SETSIZE; i++) { /* Find an available slot */
    if (req_resp_pool.connfd[i] < 0)
  for (i = 0; i < OPEN_MAX; i++) { /* Find an available slot */
    if (req_resp_pool.conns[i].fd < 0)
    {
      /* Add connected descriptor to the req_resp_pool */
      req_resp_pool.connfd[i] = connfd;
      req_resp_pool.connfdNodeMap.insert({connfd, node});
     // Rio_readinitb(&req_resp_pool.clientrio[i], connfd); //line:conc:echoservers:endaddclient
      req_resp_pool.conns[i].fd = connfd;
      req_resp_pool.conns[i].events = POLLIN;
      /* Add the descriptor to descriptor set */
      FD_SET(connfd, &req_resp_pool.read_set); //line:conc:echoservers:addconnfd
      FD_SET(connfd, &req_resp_pool.write_set);
      FD_SET(connfd, &req_resp_pool.except_set);
      /* Update max descriptor and req_resp_pool highwater mark */
      if (connfd > req_resp_pool.maxfd)
        req_resp_pool.maxfd = connfd;
      if (i > req_resp_pool.maxi)
        req_resp_pool.maxi = i;
      break;
    }
  }
  if (i == FD_SETSIZE) {
  if (i > req_resp_pool.maxi)
      req_resp_pool.maxi = i;
  if (i == OPEN_MAX) {
    /* Couldn't find an empty slot */
    LoggerFactory::getLogger()->error(errno, "add_client error: Too many clients");
    return -1;
  }
  
  return connfd;
}
void NetModSocket::close_connect(int connfd) {
  net_node_t *node =  req_resp_pool.connfdNodeMap.find(connfd)->second;
  // std::map<std::string, int>::iterator mapIter;
  if(close(connfd) != 0) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect");
  }
  FD_CLR(connfd, &req_resp_pool.read_set);
  FD_CLR(connfd, &req_resp_pool.write_set);
  FD_CLR(connfd, &req_resp_pool.except_set);
  int i;
  char mapKey[256];
  // char portstr[32];
  sprintf(mapKey, "%s:%d", node->host, node->port);
  req_resp_pool.connectionMap.erase(mapKey);
// LoggerFactory::getLogger()->debug("close_connect");
  std::map<std::string, int>::iterator map_iter;
  if(close(connfd) != 0) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect close");
  }
  for (i = 0; i <= req_resp_pool.maxi; i++) {
    if(req_resp_pool.conns[i].fd == connfd) {
      req_resp_pool.conns[i].fd = -1;
    }
  }
  for ( map_iter = req_resp_pool.connectionMap.begin(); map_iter != req_resp_pool.connectionMap.end(); ) {
    if(connfd == map_iter->second) {
// std::cout << "map_iter->first==" << map_iter->first << std::endl;
     map_iter = req_resp_pool.connectionMap.erase(map_iter);
    } else {
      ++map_iter;
    }
  }
  LoggerFactory::getLogger()->debug( "closed %d\n", connfd);
}
@@ -119,9 +124,9 @@
  int i, n, recv_size,  connfd;
  net_node_t *node;
  void *recv_buf;
  struct timeval timeout = {5, 0};
  int  timeout = 5 * 1000;
 
  int n_conn_suc = 0, n_recv_suc = 0;
  int n_req = 0, n_recv_suc = 0, n_resp;
   
  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
  
@@ -146,71 +151,78 @@
      continue;
    }
    n_conn_suc++;
// printf("write_request %s:%d\n", node->host, node->port);
    if(write_request(connfd, node->key, send_buf, send_size) != 0) {
      LoggerFactory::getLogger()->error("write_request failture %s:%d\n", node->host, node->port);
      close_connect(connfd);
      // req_resp_pool.conns[i].fd = -1;
    } else {
      n_req++;
    }
  }
// printf("n_conn_suc =%d\n", n_conn_suc);
// printf(" req_resp_pool.maxi = %d\n",  req_resp_pool.maxi);
// printf(" n_req = %d\n", n_req);
  while(n_recv_suc < n_conn_suc)
// int tmp = 0;
  while(n_resp < n_req)
  {
// printf(" while %d\n", tmp++);
    /* Wait for listening/connected descriptor(s) to become ready */
    req_resp_pool.ready_read_set = req_resp_pool.read_set;
    req_resp_pool.ready_write_set = req_resp_pool.write_set;
    req_resp_pool.ready_except_set = req_resp_pool.except_set;
    if( (req_resp_pool.nready = select(req_resp_pool.maxfd + 1,
      &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set,
      &req_resp_pool.ready_except_set, &timeout)) <= 0) {
      // wirite_set 和 read_set 在指定时间内都没准备好
    if( (req_resp_pool.nready = poll(req_resp_pool.conns, req_resp_pool.maxi + 1, timeout) ) <= 0) {
       // wirite_set 和 read_set 在指定时间内都没准备好
      break;
    }
// printf("req_resp_pool.nready =%d\n", req_resp_pool.nready);
    for (i = 0; (i <= req_resp_pool.maxi) && (req_resp_pool.nready > 0); i++) {
      if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
      if ( (connfd = req_resp_pool.conns[i].fd) > 0 ) {
        /* If the descriptor is ready, echo a text line from it */
        node =  req_resp_pool.connfdNodeMap.find(connfd)->second;
        if ( FD_ISSET(connfd, &req_resp_pool.ready_read_set))
        if (req_resp_pool.conns[i].revents & POLLIN )
        {
          req_resp_pool.nready--;
// printf("POLLIN %d\n", connfd);
          if( (n = read_response(connfd, ret_arr+n_recv_suc)) == 0) {
            
            // 成功收到返回消息,清空读入位
            FD_CLR(connfd, &req_resp_pool.read_set);
            req_resp_pool.connfd[i] = -1;
            req_resp_pool.conns[i].fd = -1;
            n_recv_suc++;
          } else if(n == -1)  {
            req_resp_pool.conns[i].fd = -1;
            close_connect(connfd);
          } else {
            req_resp_pool.conns[i].fd = -1;
          }
          n_resp++;
// printf("read response %d\n", n);
          
        }
        if (FD_ISSET(connfd, &req_resp_pool.ready_write_set))
        {
          req_resp_pool.nready--;
// printf("write %d\n", connfd);
          if(write_request(connfd, node->key, send_buf, send_size) != 0) {
            close_connect(connfd);
          } else{
            // 一次写入完成后清空写入位
            FD_CLR(connfd, &req_resp_pool.write_set);
          }
        if (req_resp_pool.conns[i].revents & POLLOUT ) {
  // printf("poll POLLOUT %d\n", connfd);
        }
        if (FD_ISSET(connfd, &req_resp_pool.ready_except_set))
        if (req_resp_pool.conns[i].revents & (POLLRDHUP | POLLHUP | POLLERR) )
        {
// printf("poll POLLERR %d\n", connfd);
          req_resp_pool.nready--;
          close_connect(connfd);
          req_resp_pool.conns[i].fd = -1;
        }
      }
    }
  }
  FD_ZERO(&req_resp_pool.except_set);
  for (i = 0; i <= req_resp_pool.maxi; i++) {
    if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
    if ( (connfd = req_resp_pool.conns[i].fd) > 0 ) {
      // 关闭并清除写入或读取失败的连接
      close_connect(connfd);
      req_resp_pool.conns[i].fd = -1;
    }
  }
  req_resp_pool.maxi = -1;
  *recv_arr = ret_arr;
@@ -226,8 +238,7 @@
  int buf_size;
  char *buf;
  int  max_buf_size;
  buf = (char *)malloc(MAXBUF);
  if(buf == NULL) {
  if((buf = (char *)malloc(MAXBUF)) == NULL) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::NetModSocket malloc");
    exit(1);
  } else {
@@ -236,11 +247,12 @@
  buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
  if(max_buf_size < buf_size) {
    buf = (char *)realloc(buf, buf_size);
    max_buf_size = buf_size;
    if(buf == NULL) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request  realloc");
    if((buf = (char *)realloc(buf, buf_size)) == NULL) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request  realloc buf");
      exit(1);
    } else {
      max_buf_size = buf_size;
    }
  }
@@ -274,9 +286,8 @@
  char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
 
  net_mod_response_head_t response_head;
  net_node_t *node =  req_resp_pool.connfdNodeMap.find(connfd)->second;
  if ( rio_readn(connfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) !=  NET_MODE_RESPONSE_HEAD_LENGTH) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
    LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response  rio_readnb response_head");
    
    return -1;
  }
@@ -293,15 +304,14 @@
    exit(1);
  }
  if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
    LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response  rio_readnb recv_buf");
    
    return -1;
  }
  strcpy( recv_msg->host, node->host);
  recv_msg->port = node->port;
  recv_msg->key = node->key;
  strcpy( recv_msg->host, response_head.host);
  recv_msg->port = response_head.port;
  recv_msg->key = response_head.key;
  recv_msg->content = recv_buf;
  recv_msg->content_length = recv_size;
  return 0;
@@ -349,8 +359,12 @@
    buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
    if(max_buf_size < buf_size) {
      buf = (char *)realloc(buf, buf_size);
      max_buf_size = buf_size;
      if((buf = (char *)realloc(buf, buf_size)) == NULL) {
        LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe realloc buf");
      } else {
        max_buf_size = buf_size;
      }
    }
    
@@ -366,7 +380,7 @@
    if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::send conent rio_writen");
      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_writen  buf");
     
      close(clientfd);
      continue;
@@ -375,7 +389,7 @@
    // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
    if ( rio_readn(clientfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) !=  NET_MODE_RESPONSE_HEAD_LENGTH) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_readnb response_head_bs");
     
      close(clientfd);
      continue;
@@ -392,7 +406,7 @@
      exit(1);
    }
    if ( (recv_size = rio_readn(clientfd, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_readnb recv_buf");
      
      close(clientfd);
      continue;
@@ -465,8 +479,12 @@
      buf_size = NET_MODE_REQUEST_HEAD_LENGTH + content_size + request_head.topic_length;
      if(max_buf_size < buf_size) {
        buf = (char *)realloc(buf, buf_size);
        max_buf_size = buf_size;
        if( ( buf = (char *)realloc(buf, buf_size)) == NULL) {
           LoggerFactory::getLogger()->error(errno, "NetModSocket::pub realloc buf ");
        } else {
          max_buf_size = buf_size;
        }
      }
      memcpy(buf, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH);
@@ -495,42 +513,108 @@
  free(arr);
}
// ssize_t recv(void *buf, size_t len) {
//     return rio_readlineb(&rio, buf, MAXLINE);
// }
/**
  uint32_t mod;
  char host[NI_MAXHOST];
  uint32_t port;
  uint32_t key;
  uint32_t content_length;
  uint32_t topic_length;
*/
void * NetModSocket::encode_request_head(net_mod_request_head_t & request) {
  char * head = (char *)malloc(NET_MODE_REQUEST_HEAD_LENGTH);
  PUT(head, htonl(request.mod));
  PUT(head + 4, htonl(request.key));
  PUT(head + 8, htonl(request.content_length));
  PUT(head + 12, htonl(request.topic_length));
  return head;
  void * headbs = malloc(NET_MODE_REQUEST_HEAD_LENGTH);
  char *tmp_ptr = (char *)headbs;
  PUT(tmp_ptr, htonl(request.mod));
  tmp_ptr += 4;
  memcpy(tmp_ptr, request.host, NI_MAXHOST);
  tmp_ptr += NI_MAXHOST;
  PUT(tmp_ptr, htonl(request.port));
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(request.key));
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(request.content_length));
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(request.topic_length));
  return headbs;
}
net_mod_request_head_t  NetModSocket::decode_request_head(void *_headbs) {
  char *headbs = (char *)_headbs;
net_mod_request_head_t  NetModSocket::decode_request_head(void *headbs) {
  char *tmp_ptr = (char *)headbs;
  net_mod_request_head_t head;
  head.mod = ntohl(GET(headbs));
  head.key = ntohl(GET(headbs + 4));
  head.content_length = ntohl(GET(headbs + 8));
  head.topic_length = ntohl(GET(headbs + 12));
  head.mod = ntohl(GET(tmp_ptr));
  tmp_ptr += NI_MAXHOST;
  memcpy(head.host, tmp_ptr, NI_MAXHOST);
  tmp_ptr += 4;
  head.port = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.key = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.content_length = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.topic_length = ntohl(GET(tmp_ptr));
  return head;
}
/**
 char host[NI_MAXHOST];
  uint32_t port;
  uint32_t key;
  uint32_t content_length;
  uint32_t code;
  */
void * NetModSocket::encode_response_head(net_mod_response_head_t & response) {
  char * head = (char *)malloc(NET_MODE_RESPONSE_HEAD_LENGTH);
  PUT(head, htonl(response.code));
  PUT(head + 4, htonl(response.content_length));
  return head;
  void * headbs = malloc(NET_MODE_RESPONSE_HEAD_LENGTH);
  char *tmp_ptr = (char *)headbs;
  memcpy(tmp_ptr, response.host, NI_MAXHOST);
  tmp_ptr += NI_MAXHOST;
  PUT(tmp_ptr, htonl(response.port));
  tmp_ptr += 4;
  PUT(tmp_ptr , htonl(response.key));
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(response.content_length));
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(response.code));
  return headbs;
}
net_mod_response_head_t  NetModSocket::decode_response_head(void *_headbs) {
  char *headbs = (char *)_headbs;
net_mod_response_head_t  NetModSocket::decode_response_head(void *headbs) {
  char *tmp_ptr = (char *)headbs;
  net_mod_response_head_t head;
  head.code = ntohl(GET(headbs));
  head.content_length = ntohl(GET(headbs + 4));
  memcpy(head.host, tmp_ptr, NI_MAXHOST);
  tmp_ptr += NI_MAXHOST;
  head.port = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.key = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.content_length = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.code = ntohl(GET(tmp_ptr));
  return head;
}
src/socket/net_mod_socket.h
@@ -3,7 +3,9 @@
#include "usg_common.h"
#include "shm_mod_socket.h"
#include "socket_io.h"
#include <poll.h>
#define OPEN_MAX 1024
#define GET(p)       (*(uint32_t *)(p))
#define PUT(p, val)  (*(uint32_t *)(p) = (val))
@@ -20,20 +22,24 @@
    int key;
};
#define NET_MODE_REQUEST_HEAD_LENGTH 16
#define NET_MODE_REQUEST_HEAD_LENGTH (NI_MAXHOST + 5 * sizeof(uint32_t))
struct net_mod_request_head_t {
    uint32_t mod;
  char host[NI_MAXHOST];
  uint32_t port;
    uint32_t key;
    uint32_t content_length;
    uint32_t topic_length;
};
#define NET_MODE_RESPONSE_HEAD_LENGTH 8
#define NET_MODE_RESPONSE_HEAD_LENGTH (NI_MAXHOST + 4 * sizeof(uint32_t))
struct net_mod_response_head_t {
    // socket_mod_t mod;
    // int key;
  char host[NI_MAXHOST];
  uint32_t port;
    uint32_t key;
  uint32_t code;
    uint32_t content_length;
};
@@ -41,7 +47,7 @@
struct net_mod_recv_msg_t
{
  char host[128];
  char host[NI_MAXHOST];
  int port;
  int key;
  void *content;
@@ -51,22 +57,13 @@
class NetModSocket {
  struct pool{ /* Represents a pool of connected descriptors */ //line:conc:echoservers:beginpool
    int maxfd;        /* Largest descriptor in read_set */
    fd_set read_set;  /* Set of all active descriptors */
    fd_set ready_read_set; /* Subset of descriptors ready for reading  */
    fd_set write_set;
    fd_set ready_write_set;
    fd_set except_set;
    fd_set ready_except_set;
    int nready;       /* Number of ready descriptors from select */
    int maxi;         /* Highwater index into client array */
    int connfd[FD_SETSIZE];    /* Set of active descriptors */
    struct pollfd conns[OPEN_MAX];
    // net_node_t *nodes[FD_SETSIZE];
   // rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */
    std::map<int, net_node_t*> connfdNodeMap;
    // std::map<int, net_node_t*> connfdNodeMap;
    std::map<std::string, int> connectionMap;
  } ; 
test_net_socket/Makefile
@@ -7,13 +7,15 @@
# 开源工具包路径
LDDIR += -L${DEST}/lib
#-lusgcommon
# 开源工具包
LDLIBS += -lshm_queue -lusgcommon -lpthread
LDLIBS += -lshm_queue  -lpthread
INCLUDES += -I${DEST}/include/shmqueue -I$(ROOT)/include/usgcommon
#-I$(ROOT)/include/usgcommon
INCLUDES += -I${ROOT}/src -I${ROOT}/src/queue -I${ROOT}/src/socket -I${ROOT}/src/common/include
PROGS = ${DEST}/net_mod_socket
PROGS = ${DEST}/test_net_mod_socket
DEPENDENCES = $(patsubst %, %.d, $(PROGS)) 
test_net_socket/net_mod_socket.sh
@@ -9,12 +9,12 @@
    ./dgram_mod_bus server 8 & server_pid=$! &&  echo ${server_pid}
     
# 开启网络server
    ./net_mod_socket server 5000 & server_pid=$! &&  echo ${server_pid}
    ./test_net_mod_socket server 5000 & server_pid=$! &&  echo ${server_pid}
     
}
function client() {
    ./net_mod_socket client 5000
    ./test_net_mod_socket client 5000
}
function close() {
@@ -22,6 +22,12 @@
    ipcrm -a
}
function scp() {
    scp -P 100 -rp ../build basic@192.168.5.22:/data/disk2/test
    scp -rp ../build basic@192.168.20.10:/data3/workspace/wzq
}
case ${1} in
  "server")
    close
test_net_socket/test_net_mod_socket.c
File was renamed from test_net_socket/net_mod_socket.c
@@ -24,13 +24,14 @@
  char topic[512];
    
    int recv_arr_size, i, n;
    int node_arr_size = 3;
    net_mod_recv_msg_t *recv_arr;
    //192.168.20.104
  int node_arr_size = 2;
    net_node_t node_arr[] = {
        {"192.168.5.22", port, 11},
        {"192.168.20.104", port, 21},
        // {"192.168.5.22", port, 11},
        {"192.168.20.10", port, 11},
        {"192.168.20.104", port, 11}
    };
@@ -98,10 +99,12 @@
  int i,j, n, recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  int node_arr_size = 1;
  int node_arr_size = 2;
    //192.168.20.104
    net_node_t node_arr[] = {
        {NULL, port, 11}
        // {"192.168.5.22", port, 11},
    {"192.168.20.10", port, 11},
    {"192.168.20.104", port, 11}
    };
  void * client = net_mod_socket_open();
@@ -137,7 +140,7 @@
void mclient(int port) {
  int status, i = 0, processors = 4;
  int status, i = 0, processors = 2;
  void *res[processors];
  // Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
  Targ targs[processors];
@@ -146,6 +149,7 @@
  struct timeval start, end;
  long total = 0;
  
printf("开始测试...\n");
  gettimeofday(&start, NULL);
  for (i = 0; i < processors; i++) {
    targs[i].port = port;
test_socket/Makefile
@@ -8,9 +8,10 @@
LDDIR += -L${DEST}/lib
# 开源工具包
LDLIBS += -lshm_queue -lusgcommon -lpthread
#-lusgcommon
LDLIBS += -lshm_queue  -lpthread
INCLUDES += -I${DEST}/include/shmqueue -I$(ROOT)/include/usgcommon
INCLUDES += -I${ROOT}/src -I${ROOT}/src/queue -I${ROOT}/src/socket -I${ROOT}/src/common/include
PROGS = ${DEST}/dgram_mod_bus ${DEST}/dgram_mod_survey ${DEST}/dgram_mod_req_rep ${DEST}/test_timeout ${DEST}/test_open_close