wangzhengquan
2021-02-01 dd0714b75b2e29087e3cd1184995bf38a453d833
src/net/net_mod_socket.cpp
@@ -17,7 +17,7 @@
{
  int s;
  if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
      logger->error(errno, "NetModSocket::NetModSocket signal");
    logger->error(errno, "NetModSocket::NetModSocket signal");
  gpool = new NetConnPool();
@@ -165,11 +165,11 @@
      // 本地发送
     
      if(msec == 0) {
        ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size);
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG);
      } else if(msec > 0){
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout);
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG);
      } else {
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
      }
@@ -334,11 +334,11 @@
  // 本地发送
  if(node_arr == NULL || arrlen == 0) {
    if(msec == 0) {
      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY,  NULL, BUS_NOWAIT_FLAG);
    } else if(msec > 0) {
      timeout.tv_sec = msec / 1000;
      timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
    } else {
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
    }
@@ -354,11 +354,11 @@
    if(node->host == NULL) {
      // 本地发送
      if(msec == 0) {
        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG);
      } else if(msec > 0) {
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
      } else {
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
      }
@@ -457,55 +457,20 @@
 * @return 0 成功, 其他值 失败的错误码
 */
int NetModSocket::sendto(const void *buf, const int size, const int key){
  int rv = shmModSocket.sendto(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto: %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto : %d sendto  %d failed", get_key(), key);
  }
  return rv;
  return shmModSocket.sendto(buf, size, key);
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  int rv = shmModSocket.sendto_timeout(buf, size, key, &timeout);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto_timeout: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto_timeout : %d sendto  %d failed %s", get_key(),  key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto_timeout:  %d sendto  %d failed", get_key(),  key);
  }
  return rv;
  return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
}
// 发送信息立刻返回。
int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){
  int rv = shmModSocket.sendto_nowait(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto_nowait: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto_nowait %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto_nowait %d sendto  %d failed", get_key(), key);
  }
  return rv;
  return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
/**
@@ -515,54 +480,19 @@
*/
int NetModSocket::recvfrom(void **buf, int *size, int *key) {
  logger->debug(" %d NetModSocket::recvfrom before", get_key());
  int rv = shmModSocket.recvfrom(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    logger->debug("NetModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom: socket %d recvfrom failed",  get_key());
  }
  return rv;
  return shmModSocket.recvfrom(buf, size, key);
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom_timeout:  %d recvfrom failed",  get_key());
  }
  return rv;
  return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
}
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
  int rv = shmModSocket.recvfrom_nowait(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom_nowait:  %d recvfrom failed",  get_key());
  }
  return rv;
  return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
/**
@@ -576,10 +506,10 @@
// 超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::sendandrecv_timeout( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.sendandrecv_timeout(send_buf, send_size, key, recv_buf, recv_size, &timeout);
  return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, &timeout, BUS_TIMEOUT_FLAG);
}
int NetModSocket::sendandrecv_nowait( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) {
  return shmModSocket.sendandrecv_nowait(send_buf, send_size, key, recv_buf, recv_size);
  return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, NULL, BUS_NOWAIT_FLAG);
}
@@ -595,10 +525,10 @@
// 超时返回。 @sec 秒 , @nsec 纳秒
int  NetModSocket::sub_timeout( void *topic, int size, int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.sub_timeout((char *)topic,  size,  key, &timeout);
  return shmModSocket.sub((char *)topic,  size,  key, &timeout, BUS_TIMEOUT_FLAG);
}
int  NetModSocket::sub_nowait( void *topic, int size, int key){
  return shmModSocket.sub_nowait((char *)topic,  size,  key);
  return shmModSocket.sub((char *)topic,  size,  key, NULL, BUS_NOWAIT_FLAG);
}
@@ -615,10 +545,10 @@
// 超时返回。 @sec 秒 , @nsec 纳秒
int  NetModSocket::desub_timeout( void *topic, int size, int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.desub_timeout((char *)topic,  size,  key, &timeout);
  return shmModSocket.desub((char *)topic,  size,  key, &timeout, BUS_TIMEOUT_FLAG);
}
int  NetModSocket::desub_nowait( void *topic, int size, int key){
  return shmModSocket.desub_nowait((char *)topic,  size,  key);
  return shmModSocket.desub((char *)topic,  size,  key, NULL, BUS_NOWAIT_FLAG);
}
@@ -635,10 +565,10 @@
//  超时返回。 @sec 秒 , @nsec 纳秒
int  NetModSocket::pub_timeout( char *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.pub_timeout(topic, topic_size, content, content_size, key, &timeout);
  return shmModSocket.pub(topic, topic_size, content, content_size, key, &timeout, BUS_TIMEOUT_FLAG);
}
int  NetModSocket::pub_nowait( char *topic, int topic_size, void *content, int content_size, int key){
  return shmModSocket.pub_nowait(topic, topic_size, content, content_size, key);
  return shmModSocket.pub(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
}