wangzhengquan
2021-01-27 5410446ade40493d17f7e2d7f0d687b0998acc6a
timeout wait 合一
7个文件已修改
486 ■■■■■ 已修改文件
src/net/net_mod_server_socket.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp 120 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 190 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.cpp 93 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_server_socket.cpp
@@ -171,10 +171,10 @@
      timeout.tv_sec = request_head.timeout / 1000;
      timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6;
      // printf(" timeout.tv_sec = %d,  timeout.tv_nsec=%ld\n",  timeout.tv_sec,  timeout.tv_nsec );
      ret = shmModSocket.sendandrecv_unsafe_timeout(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, &timeout);
      ret = shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG);
    }
    else if(request_head.timeout == 0) {
      ret = shmModSocket.sendandrecv_unsafe_nowait(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size);
      ret = shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG);
    }
    else if(request_head.timeout == -1) {
      ret = shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size);
@@ -236,10 +236,10 @@
    if(request_head.timeout > 0) {
      timeout.tv_sec = request_head.timeout / 1000;
      timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6;
      ret = shmModSocket.pub_timeout((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY, &timeout);
      ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
    }
    else if(request_head.timeout == 0) {
      ret = shmModSocket.pub_nowait((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY);
      ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG);
    }
    else if(request_head.timeout == -1) {
      ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY);
src/net/net_mod_socket.cpp
@@ -17,7 +17,7 @@
{
  int s;
  if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
      logger->error(errno, "NetModSocket::NetModSocket signal");
    logger->error(errno, "NetModSocket::NetModSocket signal");
  gpool = new NetConnPool();
@@ -165,11 +165,11 @@
      // 本地发送
     
      if(msec == 0) {
        ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size);
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG);
      } else if(msec > 0){
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout);
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG);
      } else {
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
      }
@@ -334,11 +334,11 @@
  // 本地发送
  if(node_arr == NULL || arrlen == 0) {
    if(msec == 0) {
      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY,  NULL, BUS_NOWAIT_FLAG);
    } else if(msec > 0) {
      timeout.tv_sec = msec / 1000;
      timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
    } else {
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
    }
@@ -354,11 +354,11 @@
    if(node->host == NULL) {
      // 本地发送
      if(msec == 0) {
        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG);
      } else if(msec > 0) {
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
      } else {
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
      }
@@ -457,55 +457,20 @@
 * @return 0 成功, 其他值 失败的错误码
 */
int NetModSocket::sendto(const void *buf, const int size, const int key){
  int rv = shmModSocket.sendto(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto: %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto : %d sendto  %d failed", get_key(), key);
  }
  return rv;
  return shmModSocket.sendto(buf, size, key);
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  int rv = shmModSocket.sendto_timeout(buf, size, key, &timeout);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto_timeout: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto_timeout : %d sendto  %d failed %s", get_key(),  key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto_timeout:  %d sendto  %d failed", get_key(),  key);
  }
  return rv;
  return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
}
// 发送信息立刻返回。
int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){
  int rv = shmModSocket.sendto_nowait(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto_nowait: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto_nowait %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto_nowait %d sendto  %d failed", get_key(), key);
  }
  return rv;
  return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
/**
@@ -515,54 +480,19 @@
*/
int NetModSocket::recvfrom(void **buf, int *size, int *key) {
  logger->debug(" %d NetModSocket::recvfrom before", get_key());
  int rv = shmModSocket.recvfrom(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    logger->debug("NetModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom: socket %d recvfrom failed",  get_key());
  }
  return rv;
  return shmModSocket.recvfrom(buf, size, key);
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom_timeout:  %d recvfrom failed",  get_key());
  }
  return rv;
  return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
}
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
  int rv = shmModSocket.recvfrom_nowait(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom_nowait:  %d recvfrom failed",  get_key());
  }
  return rv;
  return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
/**
@@ -576,10 +506,10 @@
// 超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::sendandrecv_timeout( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.sendandrecv_timeout(send_buf, send_size, key, recv_buf, recv_size, &timeout);
  return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, &timeout, BUS_TIMEOUT_FLAG);
}
int NetModSocket::sendandrecv_nowait( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) {
  return shmModSocket.sendandrecv_nowait(send_buf, send_size, key, recv_buf, recv_size);
  return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, NULL, BUS_NOWAIT_FLAG);
}
@@ -595,10 +525,10 @@
// 超时返回。 @sec 秒 , @nsec 纳秒
int  NetModSocket::sub_timeout( void *topic, int size, int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.sub_timeout((char *)topic,  size,  key, &timeout);
  return shmModSocket.sub((char *)topic,  size,  key, &timeout, BUS_TIMEOUT_FLAG);
}
int  NetModSocket::sub_nowait( void *topic, int size, int key){
  return shmModSocket.sub_nowait((char *)topic,  size,  key);
  return shmModSocket.sub((char *)topic,  size,  key, NULL, BUS_NOWAIT_FLAG);
}
@@ -615,10 +545,10 @@
// 超时返回。 @sec 秒 , @nsec 纳秒
int  NetModSocket::desub_timeout( void *topic, int size, int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.desub_timeout((char *)topic,  size,  key, &timeout);
  return shmModSocket.desub((char *)topic,  size,  key, &timeout, BUS_TIMEOUT_FLAG);
}
int  NetModSocket::desub_nowait( void *topic, int size, int key){
  return shmModSocket.desub_nowait((char *)topic,  size,  key);
  return shmModSocket.desub((char *)topic,  size,  key, NULL, BUS_NOWAIT_FLAG);
}
@@ -635,10 +565,10 @@
//  超时返回。 @sec 秒 , @nsec 纳秒
int  NetModSocket::pub_timeout( char *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.pub_timeout(topic, topic_size, content, content_size, key, &timeout);
  return shmModSocket.pub(topic, topic_size, content, content_size, key, &timeout, BUS_TIMEOUT_FLAG);
}
int  NetModSocket::pub_nowait( char *topic, int topic_size, void *content, int content_size, int key){
  return shmModSocket.pub_nowait(topic, topic_size, content, content_size, key);
  return shmModSocket.pub(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
}
src/net/net_mod_socket_wrapper.h
@@ -95,6 +95,7 @@
 * @return 0是成功, 其他值是失败的错误码
 */
int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key);
/**
 * @brief 等待接收信息,在指定的时间内即使没有接受到消息也要返回
 *
src/socket/shm_mod_socket.cpp
@@ -18,7 +18,7 @@
    struct timespec timeout = {1, 0};
    if(bus_set != NULL) {
        for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
            desub_timeout(NULL, 0, *bus_iter, &timeout);
            desub(NULL, 0, *bus_iter, &timeout, BUS_TIMEOUT_FLAG);
        }
        delete bus_set;
    }
@@ -37,154 +37,65 @@
int ShmModSocket::force_bind(int key) {
    return shm_socket_force_bind(shm_socket, key);
}
/**
 * 发送信息
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
 */
int ShmModSocket::sendto(const void *buf, const int size, const int key) {
        return shm_sendto(shm_socket, buf, size, key, NULL, 0);
int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) {
    int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag);
  if(rv == 0) {
      logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
      return 0;
  }
  logger->debug("ShmModSocket::sendto : %d sendto  %d failed %s", get_key(),  key, bus_strerror(rv));
  return rv;
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) {
    return shm_sendto(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
}
// 发送信息立刻返回。
int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
    return shm_sendto(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
/**
 * 接收信息
 * @key 从谁哪里收到的信息
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
    int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, 0);
int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
    int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
    if(rv == 0) {
    logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  logger->debug("ShmModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
  return rv;
}
int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, const struct timespec *timeout) {
    int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
     return rv;
}
int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
    int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
    // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
  return rv;
}
/**
 * 发送请求信息并等待接收应答
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
    void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
int ShmModSocket::sendandrecv_unsafe(const void *send_buf, const int send_size, const int send_key,
    void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
}
int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
}
int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
}
int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
}
/**
 * 订阅指定主题
 * @topic 主题
 * @size 主题长度
 * @key 总线端口
 */
int  ShmModSocket::sub(char *topic, int size, int key){
    return _sub_( topic, size, key, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::sub_timeout(char *topic, int size, int key, const struct timespec *timeout){
    return _sub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
}
int  ShmModSocket::sub_nowait(char *topic, int size, int key) {
    return _sub_(topic, size, key, NULL,  BUS_NOWAIT_FLAG);
}
/**
 * 取消订阅指定主题
 * @topic 主题
 * @size 主题长度
 * @key 总线端口
 */
int  ShmModSocket::desub(char *topic, int size, int key){
    return _desub_( topic, size, key, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::desub_timeout(char *topic, int size, int key, const struct timespec *timeout){
    return _desub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
}
int  ShmModSocket::desub_nowait(char *topic, int size, int key) {
    return _desub_(topic, size, key, NULL,  BUS_NOWAIT_FLAG);
}
/**
 * 发布主题
 * @topic 主题
 * @content 主题内容
 * @key 总线端口
 */
int  ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){
        return _pub_(topic, topic_size, content, content_size, key, NULL, 0);
}
//  超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec * timeout){
    return _pub_( topic, topic_size, content, content_size, key, timeout, BUS_TIMEOUT_FLAG);
}
int  ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
    return _pub_(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
}
/**
 * 获取soket key
 */
int ShmModSocket::get_key(){
    return shm_socket->key;
}
// =============================================================================
/**
 * @key 总线端口
 */
int  ShmModSocket::_sub_(char *topic, int topic_size, int key,
int  ShmModSocket::sub(char *topic, int topic_size, int key,
    const struct timespec *timeout, int flags) {
    int ret;
    bus_head_t head = {};
    memcpy(head.action, "sub", sizeof(head.action));
@@ -206,10 +117,15 @@
}
/**
 * 取消订阅指定主题
 * @topic 主题
 * @size 主题长度
 * @key 总线端口
 */
int  ShmModSocket::_desub_(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
int  ShmModSocket::desub(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
    // char buf[8192];
    int ret;
    if(topic == NULL) {
@@ -239,18 +155,15 @@
}
/**
 * @key 总线端口
 * @str "<**pub**>{经济}"
 */
int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) {
    // int head_len;
    // char buf[8192+content_size];
    // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
    // head_len = strlen(buf);
    // memcpy(buf+head_len, content, content_size);
/**
 * 发布主题
 * @topic 主题
 * @content 主题内容
 * @key 总线端口
 */
int  ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) {
    int ret;
    bus_head_t head = {};
    memcpy(head.action, "pub", sizeof(head.action));
@@ -267,9 +180,20 @@
        return -1;
    }
}
/**
 * 获取soket key
 */
int ShmModSocket::get_key(){
    return shm_socket->key;
}
// =============================================================================
int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, 
  void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) {
@@ -277,7 +201,7 @@
  int buf_size;
  char *buf;
  int  max_buf_size;
  if((buf = (char *)malloc(MAXBUF)) == NULL) {
  if((buf = (char *) malloc(MAXBUF)) == NULL) {
    LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
    exit(1);
  } else {
@@ -287,7 +211,7 @@
  buf_size = BUS_HEAD_SIZE + content_size + topic_size  ;
  if(max_buf_size < buf_size) {
    
    if((buf = (char *)realloc(buf, buf_size)) == NULL) {
    if((buf = (char *) realloc(buf, buf_size)) == NULL) {
      LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf  realloc buf");
      exit(1);
    } else {
src/socket/shm_mod_socket.h
@@ -31,11 +31,7 @@
private:
     
    int _sub_( char *topic, int size, int key, const struct timespec *timeouts,  int flags);
    int _pub_( char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeouts,  int flags);
    int  _desub_( char *topic, int size, int key, const struct timespec *timeouts, int flags);
    static int get_bus_sendbuf(bus_head_t &request_head, void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf);
@@ -65,50 +61,44 @@
    /**
     * 发送信息
     * @key 发送给谁
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     * @return 0 成功, 其他值 失败的错误码
     */
    int sendto(const void *buf, const int size, const int key);
    // 发送信息超时返回。 @sec 秒 , @nsec 纳秒
    int sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout);
    // 发送信息立刻返回。
    int sendto_nowait(const void *buf, const int size, const int key);
    int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0);
    /**
     * 接收信息
     * @key 从谁哪里收到的信息
     * @return 0 成功, 其他值 失败的错误码
    */
    int recvfrom(void **buf, int *size, int *key);
    // 接受信息超时返回。 @sec 秒 , @nsec 纳秒
    int recvfrom_timeout(void **buf, int *size, int *key,  const struct timespec *timeout);
    int recvfrom_nowait(void **buf, int *size, int *key);
    int recvfrom(void **buf, int *size, int *key,  const struct timespec *timeout = NULL, int flag = 0);
    /**
     * 发送请求信息并等待接收应答
     * @key 发送给谁
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     * @return 0 成功, 其他值 失败的错误码
    */
    int sendandrecv(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int sendandrecv_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout) ;
    int sendandrecv_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
    int sendandrecv(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size,
     const struct timespec *timeout = NULL, int flag = 0);
    int sendandrecv_unsafe(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const  struct timespec *timeout) ;
    int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
    int sendandrecv_unsafe(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size,
     const  struct timespec *timeout = NULL, int flag = 0) ;
    /**
     * 订阅指定主题
     * @topic 主题
     * @size 主题长度
     * @key 总线端口
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     */
    int  sub(char *topic, int size, int key);
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int  sub_timeout(char *topic, int size, int key,  const struct timespec *timeout);
    int  sub_nowait(char *topic, int size, int key);
    int  sub(char *topic, int size, int key,  const struct timespec *timeout = NULL, int flag = 0);
     /**
@@ -116,22 +106,18 @@
      * @topic 主题,主题为空时取消全部订阅
     * @size 主题长度
     * @key 总线端口
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     */
    int desub( char *topic, int size, int key);
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int desub_timeout(char *topic, int size, int key, const struct timespec *timeout);
    int desub_nowait(char *topic, int size, int key) ;
    int desub(char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0);
    /**
     * 发布主题
     * @topic 主题
     * @content 主题内容
     * @key 总线端口
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     */
    int  pub(char *topic, int topic_size, void *content, int content_size, int key);
    //  超时返回。 @sec 秒 , @nsec 纳秒
    int  pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, const  struct timespec *timeout);
    int  pub_nowait(char *topic, int topic_size, void *content, int content_size, int key);
    int  pub(char *topic, int topic_size, void *content, int content_size, int key, const  struct timespec *timeout = NULL, int flag = 0);
    /**
test_net_socket/net_mod_socket.sh
@@ -9,6 +9,7 @@
    ./test_net_mod_socket --fun="start_reply" --key=100 & server_pid=$! &&  echo "pid: ${server_pid}" 
}
# 交互式客户端
function client() {
    # ./test_net_mod_socket --fun="start_net_client" \
@@ -23,12 +24,25 @@
     
}
# 无限循环send
function send() {
    ./test_net_mod_socket --fun="test_net_sendandrecv" \
     --sendlist="localhost:5000:100, localhost:5000:100"
}
# 多线程send
function msend() {
    ./test_net_mod_socket --fun="test_net_sendandrecv_threads" \
     --sendlist="localhost:5000:100, localhost:5000:100"
     
}
# 无限循环 pub
function pub() {
    ./test_net_mod_socket --fun="test_net_pub" \
     --publist="localhost:5000, localhost:5000"
}
# 多线程pub
function mpub() {
    ./test_net_mod_socket --fun="test_net_pub_threads" \
     --publist="localhost:5000, localhost:5000"
@@ -56,9 +70,15 @@
  "msend")
    msend
  ;;
  "send")
    send
  ;;
  "mpub")
    mpub
  ;;
  "pub")
    pub
  ;;
  "close")
     close
  ;;
test_net_socket/test_net_mod_socket.cpp
@@ -5,6 +5,7 @@
#include "shm_mm_wrapper.h"
#include "usg_common.h"
#include <getopt.h>
#include "logger_factory.h"
#define  SCALE  100000
@@ -141,7 +142,7 @@
  int remote_port;
  while ( (rv = net_mod_socket_recvfrom(client, &recvbuf, &size, &remote_port) ) == 0) {
   // printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
    sprintf(sendbuf, "RECEIVED  PORT %d NAME %s", remote_port, recvbuf);
    sprintf(sendbuf, "RECEIVED:  %s", recvbuf);
    net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port);
    free(recvbuf);
  }
@@ -194,7 +195,7 @@
        n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1000);
            printf(" %d nodes reply\n", n);
            for(i=0; i<recv_arr_size; i++) {
                printf("host:%s, port: %d, key:%d, content: %s\n",
                printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
                    recv_arr[i].host,
                    recv_arr[i].port,
                    recv_arr[i].key,
@@ -247,7 +248,8 @@
  Targ *targ = (Targ *)arg;
  char sendbuf[512];
 
  int i,j, n, recv_arr_size;
  int i,j, n;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  int total = 0;
 
@@ -271,7 +273,7 @@
    n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000);
    printf("%d: send %d nodes\n", i, n);
    for(j=0; j < recv_arr_size; j++) {
        fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n",
        fprintf(fp, "reply from (host:%s, port: %d, key:%d) >> %s\n",
            recv_arr[j].host,
            recv_arr[j].port,
            recv_arr[j].key,
@@ -287,9 +289,10 @@
  return (void *)total;
}
//多线程send
void test_net_sendandrecv_threads(char *nodelist) {
  int status, i = 0, processors = 1;
  int status, i = 0, processors = 4;
  void *res[processors];
  // Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
  Targ targs[processors];
@@ -326,10 +329,42 @@
 
}
// 无限循环send
void test_net_sendandrecv(char *nodelist) {
  int n, i;
  void * client;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  net_node_t *node_arr;
  int node_arr_size = parse_node_list(nodelist, &node_arr);
  char content[128];
  sprintf(content, "pid:%ld say Hello!!", (long)getpid());
  client = net_mod_socket_open();
  while(true) {
    n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1000);
    printf(" %d nodes reply\n", n);
    for(i=0; i<recv_arr_size; i++) {
      LoggerFactory::getLogger()->debug("reply from (host:%s, port: %d, key:%d) >> %s\n",
        recv_arr[i].host,
        recv_arr[i].port,
        recv_arr[i].key,
        recv_arr[i].content
      );
    }
    // 使用完后,不要忘记释放掉
    net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
  }
  net_mod_socket_close(client);
}
void *_run_pub_(void *arg) {
  Targ *targ = (Targ *)arg;
  char sendbuf[512];
  char sendbuf[128];
 
  int i,j, n;
  int total = 0;
@@ -338,9 +373,6 @@
  int node_arr_size = parse_node_list(targ->nodelist, &node_arr);
 
  char *topic = "news";
  // char filename[512];
  // sprintf(filename, "test%d.tmp", targ->id);
  // FILE *fp = NULL;
@@ -353,7 +385,7 @@
   
    n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1);
    // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
    printf( "pub:%s to  %d nodes\n", sendbuf, n);
    LoggerFactory::getLogger()->debug( "pub:%s to  %d nodes\n", sendbuf, n);
    total += n;
  }
  // fclose(fp);
@@ -361,6 +393,7 @@
  return (void *)total;
}
//多线程pub
void test_net_pub_threads(char *nodelist) {
  int status, i = 0, processors = 4;
@@ -399,6 +432,28 @@
  // fflush(stdout);
  net_mod_socket_close(client);
}
// 无限循环pub
void test_net_pub(char *nodelist) {
  int n;
  char sendbuf[512];
  net_node_t *node_arr;
  int node_arr_size = parse_node_list(nodelist, &node_arr);
  char *topic = "news";
  sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid());
  void * client = net_mod_socket_open();
  while (true) {
    n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1);
    // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
    LoggerFactory::getLogger()->debug( "pub to  %d nodes\n", n);
  }
  net_mod_socket_close(client);
}
@@ -456,6 +511,15 @@
     
    test_net_sendandrecv_threads(opt.sendlist);
  }
  else if (strcmp("test_net_sendandrecv", opt.fun) == 0) {
    if(opt.sendlist == 0) {
      fprintf(stderr, "Missing sendlist .\n");
      usage(argv[0]);
      exit(1);
    }
    test_net_sendandrecv(opt.sendlist);
  }
  else if (strcmp("test_net_pub_threads", opt.fun) == 0) {
    if(opt.publist == 0) {
      fprintf(stderr, "Missing publist .\n");
@@ -465,6 +529,15 @@
     
    test_net_pub_threads(opt.publist);
  }
  else if (strcmp("test_net_pub", opt.fun) == 0) {
    if(opt.publist == 0) {
      fprintf(stderr, "Missing publist .\n");
      usage(argv[0]);
      exit(1);
    }
    test_net_pub(opt.publist);
  }
  
  else {
    usage(argv[0]);