wangzhengquan
2021-03-01 42d41eafe863d5286251eb49c908074a7e015f37
update
7个文件已修改
421 ■■■■■ 已修改文件
CMakeLists.txt 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/CMakeLists.txt 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp 109 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.h 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.cpp 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.h 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/shm_util.cpp 204 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
CMakeLists.txt
@@ -29,6 +29,6 @@
    add_subdirectory(${PROJECT_SOURCE_DIR}/src)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
#    add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
#    add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util)
endif()
src/CMakeLists.txt
@@ -53,6 +53,7 @@
target_link_libraries(shm_queue PUBLIC  ${EXTRA_LIBS} )
# generate md5
if (BUILD_SHARED_LIBS)
  add_custom_command(
    OUTPUT ${PROJECT_BINARY_DIR}/lib/libshm_queue.so.md5
src/net/net_mod_socket.cpp
@@ -46,19 +46,19 @@
  return shmModSocket.force_bind(key);
}
int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
}
int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec) {
  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
}
int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0);
// int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
//   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
//   return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
// }
// int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
//   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec) {
//   return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
// }
// int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
//   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
//   return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0);
}
// }
/* Free thread-specific data buffer */
@@ -124,21 +124,22 @@
int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size,
  net_mod_err_t ** _err_arr, int *_err_arr_size, int  msec ) {
  int i,  recv_size, connfd;
  net_node_t *node;
  void *recv_buf = NULL;
  struct timespec timeout;
  int ret;
  int n_req = 0, n_recv_suc = 0, n_recv_err = 0, n_resp =0;
  int n_req = 0, n_recv_suc = 0, n_err = 0, n_resp =0;
  
  net_mod_request_head_t request_head = {};
   
  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
  net_mod_recv_err_t err_arr[arrlen];
  net_mod_err_t *err_arr = (net_mod_err_t *)calloc(arrlen, sizeof(net_mod_err_t));
 
  NetConnPool *mpool = _get_pool();
@@ -166,11 +167,11 @@
        ret_arr[n_recv_suc].content_length = recv_size;
        n_recv_suc++;
      } else {
        err_arr[n_recv_err].port = 0;
        err_arr[n_recv_err].key = node->key;
        err_arr[n_recv_err].code = ret;
        n_recv_err++;
        logger->error("NetModSocket::  _sendandrecv_ to key %d failed. %s",  node->key, bus_strerror(ret));
        err_arr[n_err].port = 0;
        err_arr[n_err].key = node->key;
        err_arr[n_err].code = ret;
        n_err++;
        // logger->error("NetModSocket::  _sendandrecv_ to key %d failed. %s",  node->key, bus_strerror(ret));
      }
     
@@ -178,11 +179,11 @@
    }
    if( (connfd = mpool->getConn(node->host, node->port)) < 0 ) {
      memcpy(err_arr[n_recv_err].host, node->host, sizeof(err_arr[n_recv_err].host));
      err_arr[n_recv_err].port =  node->port;
      err_arr[n_recv_err].key = node->key;
      err_arr[n_recv_err].code = EBUS_NET;
      n_recv_err++;
      memcpy(err_arr[n_err].host, node->host, sizeof(err_arr[n_err].host));
      err_arr[n_err].port =  node->port;
      err_arr[n_err].key = node->key;
      err_arr[n_err].code = EBUS_NET;
      n_err++;
      continue;
    }
@@ -197,11 +198,11 @@
 // printf("write_request %s:%d\n", request_head.host, request_head.port);
    if(write_request(connfd, request_head, send_buf, send_size, NULL, 0) != 0) {
      LoggerFactory::getLogger()->error("write_request failture %s:%d\n", node->host, node->port);
      memcpy(err_arr[n_recv_err].host, node->host, sizeof(err_arr[n_recv_err].host));
      err_arr[n_recv_err].port =  node->port;
      err_arr[n_recv_err].key = node->key;
      err_arr[n_recv_err].code = EBUS_NET;
      n_recv_err++;
      memcpy(err_arr[n_err].host, node->host, sizeof(err_arr[n_err].host));
      err_arr[n_err].port =  node->port;
      err_arr[n_err].key = node->key;
      err_arr[n_err].code = EBUS_NET;
      n_err++;
      mpool->closeConn( connfd);
    } else {
      n_req++;
@@ -227,7 +228,7 @@
        {
          mpool->nready--;
// printf("POLLIN %d\n", connfd);
          if( (ret = read_response(connfd, ret_arr+n_recv_suc, err_arr + n_recv_err)) == 0) {
          if( (ret = read_response(connfd, ret_arr+n_recv_suc, err_arr + n_err)) == 0) {
            n_recv_suc++;
            // 成功收到返回消息,清空读入位
            mpool->conns[i].fd = -1;
@@ -238,14 +239,14 @@
            logger->error("NetModSocket::_sendandrecv_ read_response key = %d , %s", get_key(),  bus_strerror(ret));
            mpool->closeConn( connfd);
            n_recv_err++;
            n_err++;
            // mpool->conns[i].fd = -1;
          } else {
            // 代理服务没有转发成功
             
            logger->error("NetModSocket::_sendandrecv_ read_response key = %d , %s", get_key(),  bus_strerror(ret));
            mpool->conns[i].fd = -1;
            n_recv_err++;
            n_err++;
          }
          n_resp++;
@@ -277,14 +278,43 @@
  mpool->maxi = -1;
  if(recv_arr != NULL) {
    *recv_arr = ret_arr;
    if(n_recv_suc > 0)  {
     *recv_arr = ret_arr;
    } else {
      free_recv_msg_arr(ret_arr, n_recv_suc);
    }
  } else {
    free_recv_msg_arr(ret_arr, n_recv_suc);
  }
  if(recv_arr_size != NULL) {
    *recv_arr_size = n_recv_suc;
     *recv_arr_size = n_recv_suc;
  }
  if(_err_arr != NULL) {
    if(n_err > 0) {
      *_err_arr = err_arr;
    } else {
      *_err_arr = NULL;
      *_err_arr_size = 0;
      free(err_arr);
    }
  } else {
    free(err_arr);
  }
  if(_err_arr_size != NULL) {
    *_err_arr_size = n_err;
  }
  return n_recv_suc;
     
@@ -330,7 +360,7 @@
  int ret;
  NetConnPool *mpool = _get_pool();
  net_mod_recv_err_t err_msg;
  net_mod_err_t err_msg;
  // 本地发送
  if(node_arr == NULL || arrlen == 0) {
@@ -636,7 +666,7 @@
 * @return 0 成功,   EBUS_NET 网络错误, 其他值 代理服务没有转发成功。
 *
 */
int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg, net_mod_recv_err_t *err_arr) {
int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg, net_mod_err_t *err_arr) {
  int recv_size;
  void *recv_buf;
  char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
@@ -667,6 +697,7 @@
    LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc recv_buf");
    exit(1);
  }
  if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
    memcpy(err_arr->host, response_head.host, sizeof(err_arr->host));
src/net/net_mod_socket.h
@@ -55,7 +55,7 @@
  
};
struct net_mod_recv_err_t
struct net_mod_err_t
{
  char host[NI_MAXHOST];
  int port;
@@ -91,12 +91,11 @@
  NetConnPool* _get_pool();
  //读取返回信息
  int read_response(int clientfd, net_mod_recv_msg_t *recv_msg, net_mod_recv_err_t *err_arr);
  int read_response(int clientfd, net_mod_recv_msg_t *recv_msg, net_mod_err_t *err_arr);
  // 发送请求信息
  int write_request(int clientfd, net_mod_request_head_t &request_head, const void *send_buf, int send_size, const void *topic_buf, int topic_size);
  int _sendandrecv_(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size,
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout);
  int _pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int timeout) ;
  
@@ -121,22 +120,7 @@
 
  /**
   * @brief 如果建立连接的节点没有接受到消息等待timeout的时间后返回
   *
   * 向node_arr 中的所有网络节点发送请求消息,节点的返回信息汇总并存储在recv_arr中
   * @node_arr 网络节点组, @node_arr_len该数组长度.如果IP为空则为本地发送。
   * @send_buf 发送的消息,@send_size 该消息体的长度
   * @recv_arr 返回的应答消息组,@recv_arr_size 该数组长度
   * @timeout 等待时间,单位是千分之一秒
   * @return 成功发送的节点的个数
   *
   * 优点:1某个节点的故障不会阻塞其他节点。2 性能好。 3 采用thread local技术即保证了线程安全,又可以使用连接池缓存连接
   */
  int sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ;
  /**
   * 如果建立连接的节点没有接受到消息等待timeout的时间后返回
   * 向node_arr 中的所有网络节点发送请求消息,节点的返回信息汇总并存储在recv_arr中
@@ -147,15 +131,10 @@
   * @return 成功发送的节点的个数
   * 优点:1某个节点的故障不会阻塞其他节点。2 性能好。 3 采用thread local技术即保证了线程安全,又可以使用连接池缓存连接
   */
  int sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  timeout);
  /**
   * 不等待立即返回
   * @timeout 等待时间,单位是千分之一秒
  */
  int sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ;
  int sendandrecv(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size,
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size,
    net_mod_err_t ** _err_arr, int *_err_arr_size, int timeout);
  /**
   * 功能同sendandrecv
src/net/net_mod_socket_wrapper.cpp
@@ -96,9 +96,9 @@
}
int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size){
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size,  net_mod_err_t ** err_arr, int *err_arr_size){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
    return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, -1);
}
/**
@@ -106,16 +106,17 @@
 * @timeout 等待时间,单位是千分之一秒
*/
int net_mod_socket_sendandrecv_timeout(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size,  int timeout){
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size,
  net_mod_err_t ** err_arr, int *err_arr_size, int timeout){
    NetModSocket *sockt = (NetModSocket *)_socket;
    // return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
    return sockt->sendandrecv_timeout(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size, timeout);
    return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, timeout);
}
int net_mod_socket_sendandrecv_nowait(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, net_mod_err_t ** err_arr, int *err_arr_size) {
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->sendandrecv_nowait(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
    return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, 0);
}
src/net/net_mod_socket_wrapper.h
@@ -136,14 +136,17 @@
 * @param node_arr_len该数组长度.如果IP为空则为本地发送。
 * @param send_buf 发送的消息
 * @param send_size 该消息体的长度
 * @param recv_arr 返回的应答消息数组
 * @param recv_arr 返回的应答消息数组,使用完后需要调用net_mod_socket_free_recv_msg_arr释放掉
 * @param recv_arr_size 返回的应答消息数组长度
 * @param err_arr 返回发送错误的节点数组,使用完后需要调用free方法释放掉
 × @param err_arr_size 返回发送错误的节点数组的长度
 *
 * @return 成功发送的节点的个数
 *
 */
int net_mod_socket_sendandrecv(void *_sockt, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ;
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size,
  net_mod_err_t ** err_arr, int *err_arr_size) ;
 
/**
@@ -154,15 +157,39 @@
 * @param node_arr_len该数组长度.如果IP为空则为本地发送。
 * @param send_buf 发送的消息
 * @param send_size 该消息体的长度
 * @param recv_arr 返回的应答消息数组
 * @param recv_arr 返回的应答消息数组,使用完后需要调用net_mod_socket_free_recv_msg_arr释放掉
 * @param recv_arr_size 返回的应答消息数组长度
 * @param err_arr 返回发送错误的节点数组,使用完后需要调用free方法释放掉
 × @param err_arr_size 返回发送错误的节点数组的长度
 * @param timeout 等待时间(豪秒,即千分之一秒)
 *
 * @return 成功发送的节点的个数
 *
 */
int net_mod_socket_sendandrecv_timeout(void *_sockt, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout);
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size,
  net_mod_err_t ** err_arr, int *err_arr_size, int timeout);
/**
 * @brief 跨机器发送消息并接受返回的应答消息,不管是否发送完成立刻返回
 *
 * 向node_arr 中的所有网络节点发送请求消息,节点的返回信息汇总并存储在recv_arr中
 * @param node_arr 网络节点组,
 * @param node_arr_len该数组长度.如果IP为空则为本地发送。
 * @param send_buf 发送的消息
 * @param send_size 该消息体的长度
 * @param recv_arr 返回的应答消息数组,使用完后需要调用net_mod_socket_free_recv_msg_arr释放掉
 * @param recv_arr_size 返回的应答消息数组长度
 * @param err_arr 返回发送错误的节点数组,使用完后需要调用free方法释放掉
 × @param err_arr_size 返回发送错误的节点数组的长度
 *
 * @return 成功发送的节点的个数
 *
 */
int net_mod_socket_sendandrecv_nowait(void *_sockt, net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size,
  net_mod_err_t ** err_arr, int *err_arr_size) ;
@@ -213,24 +240,6 @@
 * @return 0是成功, 其他值是失败的错误码
 */
int net_mod_socket_recvandsend_nowait(void *_socket, recvandsend_callback_wrapper_fn callback, void * user_data) ;
/**
 * @brief 跨机器发送消息并接受返回的应答消息,不管是否发送完成立刻返回
 *
 * 向node_arr 中的所有网络节点发送请求消息,节点的返回信息汇总并存储在recv_arr中
 * @param node_arr 网络节点组,
 * @param node_arr_len该数组长度.如果IP为空则为本地发送。
 * @param send_buf 发送的消息
 * @param send_size 该消息体的长度
 * @param recv_arr 返回的应答消息数组
 * @param recv_arr_size 返回的应答消息数组长度
 *
 * @return 成功发送的节点的个数
 *
 */
int net_mod_socket_sendandrecv_nowait(void *_sockt, net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ;
test_net_socket/shm_util.cpp
@@ -200,6 +200,8 @@
    
    int recv_arr_size, i, n;
    net_mod_recv_msg_t *recv_arr;
  net_mod_err_t *errarr;
  int errarr_size = 0;
  pthread_t tid;
  // 创建一个线程接受订阅消息
@@ -234,19 +236,31 @@
          if (fgets(content, MAXLINE, stdin) != NULL) {
              // 收到消息的节点即使没有对应的信息, 也要回复一个表示无的消息,否则会一直等待
            // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size);
        n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1);
        n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content),
         &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000);
            printf(" %d nodes reply\n", n);
            for(i=0; i<recv_arr_size; i++) {
                printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
                    recv_arr[i].host,
                    recv_arr[i].port,
                    recv_arr[i].key,
                    (char *)recv_arr[i].content
                );
            }
        if(recv_arr_size > 0) {
          for(i=0; i<recv_arr_size; i++) {
            printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
              recv_arr[i].host,
              recv_arr[i].port,
              recv_arr[i].key,
              (char *)recv_arr[i].content
            );
          }
          // 使用完后,不要忘记释放掉
          net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
        }
            
                // 使用完后,不要忘记释放掉
            net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
        if(errarr_size > 0) {
          for(i = 0; i < errarr_size; i++) {
            printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
          }
          free(errarr);
        }
          }
    }
    else if(strcmp(action, "desub") == 0) {
@@ -290,10 +304,12 @@
  Targ *targ = (Targ *)arg;
  char sendbuf[128];
 
  int j, n;
  int recv_arr_size;
  int i, j, n;
  int recv_arr_size = 0;
  net_mod_recv_msg_t *recv_arr;
  int total = 0;
  net_mod_err_t *errarr;
  int errarr_size = 0;
 
  int rkey, lkey;
  unsigned int l = 0 , rl;
@@ -312,30 +328,42 @@
    sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l);
    // fprintf(fp, "requst:%s\n", sendbuf);
    // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
    n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1);
    n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1,
      &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1);
    printf("%d: send %d nodes\n", l, n);
    for(j=0; j < recv_arr_size; j++) {
      fprintf(stdout, "%d send '%s' to %d. received  from (host=%s, port= %d, key=%d) '%s'\n",
        net_mod_socket_get_key(client),
        sendbuf,
        targ->node->key,
        recv_arr[j].host,
        recv_arr[j].port,
        recv_arr[j].key,
        (char *)recv_arr[j].content
      );
    if(recv_arr_size > 0) {
      for(j=0; j < recv_arr_size; j++) {
        fprintf(stdout, "%d send '%s' to %d. received  from (host=%s, port= %d, key=%d) '%s'\n",
          net_mod_socket_get_key(client),
          sendbuf,
          targ->node->key,
          recv_arr[j].host,
          recv_arr[j].port,
          recv_arr[j].key,
          (char *)recv_arr[j].content
        );
      printf("key == %d\n", net_mod_socket_get_key(client));
      assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3);
      assert(targ->node->key == rkey);
      assert(net_mod_socket_get_key(client) == lkey);
      assert(rl == l);
        printf("key == %d\n", net_mod_socket_get_key(client));
        assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3);
        assert(targ->node->key == rkey);
        assert(net_mod_socket_get_key(client) == lkey);
        assert(rl == l);
      }
      // 使用完后,不要忘记释放掉
      net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
    }
        // 使用完后,不要忘记释放掉
        net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
    if(errarr_size > 0) {
      for(i = 0; i < errarr_size; i++) {
        printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
      }
      free(errarr);
    }
    total += n;
  }
  if(fp != NULL)
    fclose(fp);
  // net_mod_socket_close(client);
@@ -384,7 +412,7 @@
  double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
  long diffsec = (long) (difftime/1000000);
  long diffusec = difftime - diffsec*1000000;
  fprintf(stderr,"发送数目:%ld, 成功数目: %ld, 用时: (%ld sec %ld usec), 平均: %f\n",
  fprintf(stderr,"发送数目:%d, 成功数目: %ld, 用时: (%ld sec %ld usec), 平均: %f\n",
    SCALE*node_arr_size, total, diffsec, diffusec, difftime/total );
  // fflush(stdout);
 
@@ -393,11 +421,14 @@
// 无限循环send
void test_net_sendandrecv(char *nodelist) {
  int n, j;
  int i, n, j;
  void * client;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  net_node_t *node_arr;
  net_mod_err_t *errarr;
  int errarr_size = 0;
  int node_arr_size = parse_node_list(nodelist, &node_arr);
  char buf[128];
  pid_t pid, retPid ;
@@ -413,30 +444,35 @@
  while(true) {
    sprintf(buf, hello_format, pid, l);
    n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1,
      &recv_arr, &recv_arr_size, 1000);
      &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000);
    printf(" %d nodes reply\n", n);
    for(j = 0; j < recv_arr_size; j++) {
      printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n",
        (long)pid,
        buf,
        (char *)recv_arr[j].content,
        recv_arr[j].host,
        recv_arr[j].port,
        recv_arr[j].key
    if(recv_arr_size > 0) {
      for(j = 0; j < recv_arr_size; j++) {
        printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n",
          (long)pid,
          buf,
          (char *)recv_arr[j].content,
          recv_arr[j].host,
          recv_arr[j].port,
          recv_arr[j].key
        );
        assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3);
        assert(retPid == pid);
        assert(retl == l);
        assert(remoteKey == recv_arr[j].key);
      }
      // 使用完后,不要忘记释放掉
      net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
    }
      );
      assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3);
      assert(retPid == pid);
      assert(retl == l);
      assert(remoteKey == recv_arr[j].key);
    if(errarr_size > 0) {
      for(i = 0; i < errarr_size; i++) {
        printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
      }
      free(errarr);
    }
    
    // 使用完后,不要忘记释放掉
    net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
    l++;
  }
@@ -523,7 +559,7 @@
  net_node_t *node_arr;
  int node_arr_size = parse_node_list(nodelist, &node_arr);
 
  char *topic = "news";
  const char *topic = "news";
  sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid());
  void * client = net_mod_socket_open();
@@ -577,31 +613,45 @@
  }
}
void do_sendandrecv(int key, char *sendbuf) {
  int n, j;
void do_sendandrecv(char *sendlist, char *sendbuf) {
  int i, n, j;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  net_mod_err_t *errarr;
  int errarr_size = 0;
  net_node_t node_arr[] = {NULL, 0, key};
  net_node_t *node_arr;
  int node_arr_size = parse_node_list(sendlist, &node_arr);
  print_node_list(node_arr, node_arr_size);
  void * client = net_mod_socket_open();
  n = net_mod_socket_sendandrecv_timeout(client, node_arr, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 5000);
  if(n == 0) {
    printf("send failed\n");
    return;
  }
  printf(" %d nodes reply\n", n);
  for(j=0; j < recv_arr_size; j++) {
  n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1,
    &recv_arr, &recv_arr_size, &errarr, &errarr_size, 5000);
  printf(" %d nodes reply\n", recv_arr_size);
  if(recv_arr_size > 0) {
    for(j=0; j < recv_arr_size; j++) {
    fprintf(stdout, "%d send '%s' to %d. received  from (host=%s, port= %d, key=%d) '%s'\n\n",
      net_mod_socket_get_key(client),
      sendbuf,
      key,
      recv_arr[j].host,
      recv_arr[j].port,
      recv_arr[j].key,
      (char *)recv_arr[j].content
    );
      fprintf(stdout, "===> suc: %d send '%s'. received  from (host=%s, port= %d, key=%d), '%s'\n\n",
        net_mod_socket_get_key(client),
        sendbuf,
        recv_arr[j].host,
        recv_arr[j].port,
        recv_arr[j].key,
        (char *)recv_arr[j].content
      );
    }
    // 使用完后,不要忘记释放掉
    net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
  }
// printf("errarr_size = %d\n", errarr_size);
  if(errarr_size > 0) {
    for(i = 0; i < errarr_size; i++) {
      printf("===> error: (host:%s, port: %d, key:%d). %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
    }
    free(errarr);
  }
  net_mod_socket_close(client);
@@ -771,6 +821,10 @@
  net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t));
  for(i = 0; i < entry_arr_len; i++) {
    if(strchr(entry_arr[i], ':') == NULL) {
      node_arr[i]= {NULL, 0, atoi(entry_arr[i])};
      continue;
    }
    property_arr_len = str_split(entry_arr[i], ":", &property_arr);
   printf("=====%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]);
@@ -854,9 +908,9 @@
      usage(prog);
      exit(1);
    }
    int key = atoi(argv[1]);
    char *sendlist = argv[1];
    char *content = argv[2];
    do_sendandrecv(key, content);
    do_sendandrecv(sendlist, content);
  }
  else if (strcmp("start_bus_server", fun) == 0) {
   
@@ -942,7 +996,7 @@
  }
  else {
    printf("%Invalid funciton name\n");
    printf("Invalid funciton name\n");
    usage(argv[0]);
    exit(1);