wangzhengquan
2020-10-19 af85260254bacac40a68d4f5f61950523beb3a27
update
12个文件已修改
360 ■■■■ 已修改文件
Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket.c 80 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket.h 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket_wrapper.c 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket_wrapper.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.c 98 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.h 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.c 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.c 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.c 103 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Makefile
@@ -1,5 +1,5 @@
# debug "make --just-print"
DIRS = src test_net_socket
DIRS = src test_net_socket test_socket
TAR_NAME = shm_queue.tar.gz
all:
src/socket/net_mod_server_socket.c
@@ -4,37 +4,55 @@
#include "net_mod_socket_io.h"
#include "net_mod_socket.h"
 
NetModServerSocket::NetModServerSocket(int port):max_buf(1024), max_topic_buf(256)
NetModServerSocket::NetModServerSocket(int _port): listenfd(0), port(_port), max_buf(1024), max_topic_buf(256), max_response_buf(1024)
{
  char portstr[32];
  //shmModSocket = new ShmModSocket;
  sprintf(portstr, "%d", port);
  listenfd = Open_listenfd(portstr);
  init_pool(listenfd);
  buf = malloc(max_buf);
  if(buf == NULL) {
    err_exit(errno, "process_client malloc");
    err_exit(errno, "NetModServerSocket::NetModServerSocket malloc");
  }
  topic_buf = malloc(max_topic_buf);
  if(topic_buf == NULL) {
    err_exit(errno, "process_client malloc");
    err_exit(errno, "NetModServerSocket::NetModServerSocket malloc");
  }
  response_buf = (char *) malloc(max_response_buf);
  if(response_buf == NULL) {
    err_exit(errno, "NetModServerSocket::NetModServerSocket malloc");
  }
}
NetModServerSocket::~NetModServerSocket() {
   Close(listenfd);
   free(buf);
  if(listenfd != 0) {
    Close(listenfd);
  }
  if(buf != NULL)
    free(buf);
  if(topic_buf != NULL)
   free(topic_buf);
  if(response_buf != NULL)
   free(response_buf);
}
void NetModServerSocket::start() {
int NetModServerSocket::start() {
    int connfd;
    socklen_t clientlen;
  struct sockaddr_storage clientaddr;
  char portstr[32];
  //shmModSocket = new ShmModSocket;
  sprintf(portstr, "%d", port);
  listenfd = open_listenfd(portstr);
  if(listenfd < 0) {
    LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start");
    return -1;
  }
  init_pool(listenfd);
    while (1)
  {
    /* Wait for listening/connected descriptor(s) to become ready */
@@ -52,6 +70,7 @@
    /* Echo a text line from each ready connected descriptor */
    check_clients();
  }
  return 0;
}
void  NetModServerSocket::init_pool(int listenfd)
@@ -105,8 +124,8 @@
  net_mod_response_head_t response_head;
  char request_head_bs[NET_MODE_REQUEST_HEAD_LENGTH];
  void  *recv_buf;
char tmp[8196];
  int recv_size;
// char tmp[8196];
  int recv_size, response_buf_size;
  if (rio_readn(connfd, request_head_bs, NET_MODE_REQUEST_HEAD_LENGTH) !=  NET_MODE_REQUEST_HEAD_LENGTH)
  {
@@ -129,11 +148,32 @@
  }
  if(request_head.mod == REQ_REP) {
    // TODO: shmModSocket.sendandrecv_unsafe
    shmModSocket.sendandrecv(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size);
    response_head.content_length = recv_size;
    Rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH);
    Rio_writen(connfd, recv_buf, recv_size);
    if(shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size) != 0) {
      response_head.code = 1;
      response_head.content_length = 0;
      if( rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH )
        return -1;
      //Rio_writen(connfd, recv_buf, recv_size);
    } else {
      response_head.code = 0;
      response_head.content_length = recv_size;
      response_buf_size = NET_MODE_RESPONSE_HEAD_LENGTH + recv_size;
      if(max_response_buf < response_buf_size) {
        buf = (char *)realloc(response_buf, response_buf_size);
        max_response_buf = response_buf_size;
      }
      memcpy(response_buf, NetModSocket::encode_response_head(response_head),  NET_MODE_RESPONSE_HEAD_LENGTH);
      memcpy(response_buf + NET_MODE_RESPONSE_HEAD_LENGTH, recv_buf,  recv_size);
      if(rio_writen(connfd, response_buf, response_buf_size) != response_buf_size) {
        return -1;
      }
    }
    return 0;
  } else if(request_head.mod == BUS) {
    if(request_head.topic_length > max_topic_buf) {
      topic_buf = realloc(topic_buf, request_head.topic_length);
@@ -147,7 +187,7 @@
    if (rio_readn(connfd, topic_buf, request_head.topic_length) != request_head.topic_length ) {
      return -1;
    }
 LoggerFactory::getLogger()->debug("====server pub %s===\n", buf);
LoggerFactory::getLogger()->debug("====server pub %s===\n", buf);
    shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key);
  }
src/socket/net_mod_server_socket.h
@@ -28,13 +28,18 @@
private:
    int listenfd;
    int port;
    ShmModSocket shmModSocket;
    pool pool;
    void *buf;
    void *topic_buf;
    char *response_buf;
  size_t max_buf;
  size_t max_topic_buf;
  size_t max_response_buf;
    void init_pool(int listenfd);
    void add_client(int connfd);
@@ -47,8 +52,9 @@
    
    /*
     * 启动 server
     * @return 0 success, 其他 failture
    */
    void start();
    int start();
    ~NetModServerSocket();
};
src/socket/net_mod_server_socket_wrapper.c
@@ -14,7 +14,7 @@
}
void net_mod_server_socket_start(void *_sockt) {
int net_mod_server_socket_start(void *_sockt) {
    net_mod_server_socket_t *sockt = (net_mod_server_socket_t *)_sockt;
    sockt->sockt->start();
    return sockt->sockt->start();
}
src/socket/net_mod_server_socket_wrapper.h
@@ -25,7 +25,7 @@
/**
 * 启动
 */
void net_mod_server_socket_start(void *_sockt);
int net_mod_server_socket_start(void *_sockt);
src/socket/net_mod_socket.c
@@ -10,6 +10,8 @@
NetModSocket::NetModSocket() 
{
        init_req_rep_req_resp_pool();
}
@@ -20,6 +22,7 @@
    Close(clientfd);
     
  }
}
@@ -95,7 +98,9 @@
  net_node_t *node =  req_resp_pool.connfdNodeMap.find(connfd)->second;
  // std::map<std::string, int>::iterator mapIter;
  Close(connfd); //line:conc:echoservers:closeconnfd
  if(close(connfd) != 0) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect");
  }
  FD_CLR(connfd, &req_resp_pool.read_set);
  FD_CLR(connfd, &req_resp_pool.write_set);
  FD_CLR(connfd, &req_resp_pool.except_set);
@@ -104,23 +109,23 @@
  // char portstr[32];
  sprintf(mapKey, "%s:%d", node->host, node->port);
  req_resp_pool.connectionMap.erase(mapKey);
LoggerFactory::getLogger()->debug("close_connect");
// LoggerFactory::getLogger()->debug("close_connect");
}
int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
  int i,  recv_size,  connfd;
  int i, n, recv_size,  connfd;
  net_node_t *node;
  void *recv_buf;
  struct timeval timeout = {5, 0};
 
  int n_conn_suc = 0, n_recv_suc = 0;
   
  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
  
  init_req_rep_req_resp_pool();
  //init_req_rep_req_resp_pool();
  for (i = 0; i< arrlen; i++) {
@@ -134,23 +139,17 @@
      ret_arr[n_recv_suc].content = recv_buf;
      ret_arr[n_recv_suc].content_length = recv_size;
      n_recv_suc++;
      continue;
    }
    if( (connfd = connect(node)) < 0 ) {
      continue;
    }
    // if(write_request(connfd, node->key, send_buf, send_size) != 0) {
    //   close_connect(connfd);
    // }
    n_conn_suc++;
    // optval = 0;
    // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
  }
printf("n_conn_suc =%d\n", n_conn_suc);
// printf("n_conn_suc =%d\n", n_conn_suc);
  while(n_recv_suc < n_conn_suc)
  {
@@ -158,8 +157,13 @@
    req_resp_pool.ready_read_set = req_resp_pool.read_set;
    req_resp_pool.ready_write_set = req_resp_pool.write_set;
    req_resp_pool.ready_except_set = req_resp_pool.except_set;
    req_resp_pool.nready = select(req_resp_pool.maxfd + 1, &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set, &req_resp_pool.ready_except_set, NULL);
printf("req_resp_pool.nready =%d\n", req_resp_pool.nready);
    if( (req_resp_pool.nready = select(req_resp_pool.maxfd + 1,
      &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set,
      &req_resp_pool.ready_except_set, &timeout)) <= 0) {
      // wirite_set 和 read_set 在指定时间内都没准备好
      break;
    }
// printf("req_resp_pool.nready =%d\n", req_resp_pool.nready);
    for (i = 0; (i <= req_resp_pool.maxi) && (req_resp_pool.nready > 0); i++) {
      if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
        /* If the descriptor is ready, echo a text line from it */
@@ -167,17 +171,22 @@
        if ( FD_ISSET(connfd, &req_resp_pool.ready_read_set))
        {
          req_resp_pool.nready--;
          if(read_response(connfd, ret_arr+n_recv_suc) == 0) {
          if( (n = read_response(connfd, ret_arr+n_recv_suc)) == 0) {
            // 成功收到返回消息,清空读入位
            FD_CLR(connfd, &req_resp_pool.read_set);
            req_resp_pool.connfd[i] = -1;
            n_recv_suc++;
          } else {
          } else if(n == -1)  {
            close_connect(connfd);
          }
          
        }
        if (FD_ISSET(connfd, &req_resp_pool.ready_write_set))
        {
          req_resp_pool.nready--;
printf("write %d\n", connfd);
// printf("write %d\n", connfd);
          if(write_request(connfd, node->key, send_buf, send_size) != 0) {
            close_connect(connfd);
          } else{
@@ -195,6 +204,15 @@
    }
  }
  FD_ZERO(&req_resp_pool.except_set);
  for (i = 0; i <= req_resp_pool.maxi; i++) {
    if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
      // 关闭并清除写入或读取失败的连接
      close_connect(connfd);
    }
  }
  req_resp_pool.maxi = -1;
  *recv_arr = ret_arr;
  if(recv_arr_size != NULL) {
    *recv_arr_size = n_recv_suc;
@@ -205,21 +223,25 @@
int NetModSocket::write_request(int clientfd, int key, void *send_buf, int send_size) {
  net_mod_request_head_t request_head = {};
  static char *buf;
  static int buf_size, max_buf_size;
  int buf_size;
  char *buf;
  int  max_buf_size;
  buf = (char *)malloc(MAXBUF);
  if(buf == NULL) {
    buf = (char *)malloc(MAXBUF);
    LoggerFactory::getLogger()->error(errno, "NetModSocket::NetModSocket malloc");
    exit(1);
  } else {
    max_buf_size = MAXBUF;
    LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv malloc");
  }
  buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
  if(max_buf_size < buf_size) {
    buf = (char *)realloc(buf, buf_size);
    max_buf_size = buf_size;
    if(buf == NULL) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request  realloc");
      exit(1);
    }
  }
  request_head.mod = REQ_REP;
@@ -234,13 +256,18 @@
  if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::send conent rio_writen");
    LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request  rio_writen");
    free(buf);
    return -1;
  }
  free(buf);
  return 0;
}
/**
 * @return 0 成功, 1 对方没有对应的key, -1 网络错误
 *
 */
int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg) {
  int recv_size;
  void *recv_buf;
@@ -255,10 +282,14 @@
  }
  response_head =  NetModSocket::decode_response_head(response_head_bs);
  if(response_head.code != 0) {
    // 对方没有对应的key
    return 1;
  }
  recv_buf = malloc(response_head.content_length);
  if(recv_buf == NULL) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc");
    LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc recv_buf");
    exit(1);
  }
  if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
@@ -351,6 +382,9 @@
    }
    response_head =  NetModSocket::decode_response_head(response_head_bs);
    if(response_head.code != 0) {
      continue;
    }
    recv_buf = malloc(response_head.content_length);
    if(recv_buf == NULL) {
@@ -452,7 +486,7 @@
  free(buf);
  return nsuc;
}
void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
  for(int i =0; i< size; i++) {
@@ -488,13 +522,15 @@
void * NetModSocket::encode_response_head(net_mod_response_head_t & response) {
  char * head = (char *)malloc(NET_MODE_RESPONSE_HEAD_LENGTH);
  PUT(head, htonl(response.content_length));
  PUT(head, htonl(response.code));
  PUT(head + 4, htonl(response.content_length));
  return head;
}
net_mod_response_head_t  NetModSocket::decode_response_head(void *_headbs) {
  char *headbs = (char *)_headbs;
  net_mod_response_head_t head;
  head.content_length = ntohl(GET(headbs));
  head.code = ntohl(GET(headbs));
  head.content_length = ntohl(GET(headbs + 4));
  return head;
}
src/socket/net_mod_socket.h
@@ -7,8 +7,7 @@
#define GET(p)       (*(uint32_t *)(p))
#define PUT(p, val)  (*(uint32_t *)(p) = (val))
#define NET_MODE_REQUEST_HEAD_LENGTH 16
#define NET_MODE_RESPONSE_HEAD_LENGTH 4
@@ -21,6 +20,7 @@
    int key;
};
#define NET_MODE_REQUEST_HEAD_LENGTH 16
struct net_mod_request_head_t {
    uint32_t mod;
@@ -29,9 +29,12 @@
    uint32_t topic_length;
};
#define NET_MODE_RESPONSE_HEAD_LENGTH 8
struct net_mod_response_head_t {
    // socket_mod_t mod;
    // int key;
  uint32_t code;
    uint32_t content_length;
};
@@ -72,6 +75,8 @@
   
  ShmModSocket shmModSocket;
  pool req_resp_pool;
  static void * encode_request_head(net_mod_request_head_t & request);
  static net_mod_request_head_t  decode_request_head(void *headbs);
@@ -97,11 +102,18 @@
   * @send_buf 发送的消息,@send_size 该消息体的长度
   * @recv_arr 返回的应答消息组,@recv_arr_size 该数组长度
   * @return 成功发送的节点的个数
   * 优点:无阻塞,性能好
   * 缺点:不是线程安全的
   */
  int sendandrecv(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, 
      net_mod_recv_msg_t ** recv_arr, int *recv_arr_size);
  /**
   * 功能同sendandrecv
   * 优点:线程安全
   * 缺点:阻塞的,性能不如sendandrecv
   *
   */
  int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, 
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size);
src/socket/shm_mod_socket.c
@@ -182,6 +182,18 @@
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
}
int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0);
}
int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
}
/**
 * 启动bus
src/socket/shm_mod_socket.h
@@ -104,6 +104,11 @@
    int sendandrecv_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
    int sendandrecv_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
    int sendandrecv_unsafe(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
    int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
    /**
     * 启动bus
src/socket/shm_socket.c
@@ -386,6 +386,27 @@
  return -1;
}
int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_port, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket  "
                "which is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
  }
  int recv_port;
  int rv;
  if ((rv = shm_sendto(socket, send_buf, send_size, send_port, timeout, flags)) == 0) {
    rv = shm_recvfrom(socket, recv_buf, recv_size, &recv_port, timeout, flags);
    return rv;
  } else {
    return rv;
  }
  return -1;
}
// ============================================================================================================
/**
src/socket/shm_socket.h
@@ -100,6 +100,13 @@
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size,  
    struct timespec * timeout = NULL,  int flags=0);
/**
 * 功能同shm_sendandrecv, 但是不是线程安全的
 */
int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size,
    struct timespec * timeout = NULL,  int flags=0);
#endif
test_net_socket/net_mod_socket.c
@@ -4,9 +4,17 @@
#include "dgram_mod_socket.h"
#include "usg_common.h"
typedef struct Targ {
    int port;
    int id;
}Targ;
void server(int port) {
    void *serverSocket  = net_mod_server_socket_open(port);
    net_mod_server_socket_start(serverSocket);
    if(net_mod_server_socket_start(serverSocket) != 0) {
        err_exit(errno, "net_mod_server_socket_start");
    }
}
void client(int port ){
@@ -14,13 +22,15 @@
    char content[MAXLINE];
    char action[512];
  char topic[512];
    net_mod_recv_msg_t *recv_arr;
    int recv_arr_size, i, n;
    int node_arr_size = 3;
    net_mod_recv_msg_t *recv_arr;
    //192.168.20.104
    net_node_t node_arr[] = {
        {"192.168.5.22", port, 11},
        {"192.168.20.10", port, 11},
        {"192.168.20.104", port, 21},
        {"192.168.20.104", port, 11}
    };
@@ -76,6 +86,90 @@
  
}
#define  SCALE  100000
void *runclient(void *arg) {
  Targ *targ = (Targ *)arg;
  int port = targ->port;
  char sendbuf[512];
  int i,j, n, recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  int node_arr_size = 1;
    //192.168.20.104
    net_node_t node_arr[] = {
        {NULL, port, 11}
    };
  void * client = net_mod_socket_open();
    char filename[512];
    sprintf(filename, "test%d.tmp", targ->id);
    FILE *fp = NULL;
    fp = fopen(filename, "w+");
    // fp = stdout;
    int recvsize;
    void *recvbuf;
  for (i = 0; i < SCALE; i++) {
    sprintf(sendbuf, "thread(%d) %d", targ->id, i);
    fprintf(fp, "requst:%s\n", sendbuf);
    n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
    //printf("send %d nodes\n", n);
    for(j=0; j < recv_arr_size; j++) {
        fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n",
            recv_arr[j].host,
            recv_arr[j].port,
            recv_arr[j].key,
            recv_arr[j].content
        );
    }
        // 使用完后,不要忘记释放掉
        net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
  }
  fclose(fp);
  net_mod_socket_close(client);
  return (void *)i;
}
void mclient(int port) {
  int status, i = 0, processors = 4;
  void *res[processors];
  // Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
  Targ targs[processors];
  pthread_t tids[processors];
  char sendbuf[512];
  struct timeval start, end;
  long total = 0;
  gettimeofday(&start, NULL);
  for (i = 0; i < processors; i++) {
    targs[i].port = port;
    targs[i].id = i;
    pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]);
  }
  for (i = 0; i < processors; i++) {
    if (pthread_join(tids[i], &res[i]) != 0) {
      perror("multyThreadClient pthread_join");
    } else {
        total += (long)res[i];
      //fprintf(stderr, "client(%d) 写入 %ld 条数据\n", i, (long)res[i]);
    }
  }
  gettimeofday(&end, NULL);
  double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
  long diffsec = (long) (difftime/1000000);
  long diffusec = difftime - diffsec*1000000;
  fprintf(stderr,"发送数目: %ld, 用时: (%ld sec %ld usec), 平均: %f\n", total, diffsec, diffusec, difftime/total );
  // fflush(stdout);
}
int main(int argc, char *argv[]) {
    shm_init(512);
@@ -95,6 +189,9 @@
  if (strcmp("client", argv[1]) == 0)
     client(port);
  if (strcmp("mclient", argv[1]) == 0)
    mclient(port);
}