wangzhengquan
2021-02-02 cb85aa8a8d02a3d6dc16e3f32e78da9e70f9c7f5
src/net/net_mod_socket.cpp
@@ -17,7 +17,7 @@
{
  int s;
  if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
      logger->error(errno, "NetModSocket::NetModSocket signal");
    logger->error(errno, "NetModSocket::NetModSocket signal");
  gpool = new NetConnPool();
@@ -165,11 +165,11 @@
      // 本地发送
     
      if(msec == 0) {
        ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size);
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG);
      } else if(msec > 0){
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout);
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG);
      } else {
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
      }
@@ -334,17 +334,18 @@
  // 本地发送
  if(node_arr == NULL || arrlen == 0) {
    if(msec == 0) {
      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY,  NULL, BUS_NOWAIT_FLAG);
    } else if(msec > 0) {
      timeout.tv_sec = msec / 1000;
      timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
    } else {
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
    }
    if(ret == 0 ) {
      n_pub_suc++;
    }
    return n_pub_suc;
  }
  for (i = 0; i < arrlen; i++) {
@@ -353,11 +354,11 @@
    if(node->host == NULL) {
      // 本地发送
      if(msec == 0) {
        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG);
      } else if(msec > 0) {
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
      } else {
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
      }
@@ -374,7 +375,7 @@
      }
      request_head.mod = BUS;
      memcpy(request_head.host, node->host, sizeof(request_head.host));
      request_head.key = node->key;
      request_head.key = SHM_BUS_KEY;
      request_head.content_length = content_size;
      request_head.topic_length = strlen(topic) + 1;
      request_head.timeout = msec;
@@ -456,55 +457,20 @@
 * @return 0 成功, 其他值 失败的错误码
 */
int NetModSocket::sendto(const void *buf, const int size, const int key){
  int rv = shmModSocket.sendto(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto: %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto : %d sendto  %d failed", get_key(), key);
  }
  return rv;
  return shmModSocket.sendto(buf, size, key);
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  int rv = shmModSocket.sendto_timeout(buf, size, key, &timeout);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto_timeout: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto_timeout : %d sendto  %d failed %s", get_key(),  key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto_timeout:  %d sendto  %d failed", get_key(),  key);
  }
  return rv;
  return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
}
// 发送信息立刻返回。
int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){
  int rv = shmModSocket.sendto_nowait(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto_nowait: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto_nowait %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto_nowait %d sendto  %d failed", get_key(), key);
  }
  return rv;
  return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
/**
@@ -513,53 +479,20 @@
 * @return 0 成功, 其他值 失败的错误码
*/
int NetModSocket::recvfrom(void **buf, int *size, int *key) {
  int rv = shmModSocket.recvfrom(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    logger->debug("NetModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom: socket %d recvfrom failed",  get_key());
  }
  return rv;
  return shmModSocket.recvfrom(buf, size, key);
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom_timeout:  %d recvfrom failed",  get_key());
  }
  return rv;
  return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
}
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
  int rv = shmModSocket.recvfrom_nowait(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom_nowait:  %d recvfrom failed",  get_key());
  }
  return rv;
  return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
/**
@@ -573,10 +506,10 @@
// 超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::sendandrecv_timeout( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.sendandrecv_timeout(send_buf, send_size, key, recv_buf, recv_size, &timeout);
  return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, &timeout, BUS_TIMEOUT_FLAG);
}
int NetModSocket::sendandrecv_nowait( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) {
  return shmModSocket.sendandrecv_nowait(send_buf, send_size, key, recv_buf, recv_size);
  return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, NULL, BUS_NOWAIT_FLAG);
}
@@ -592,10 +525,10 @@
// 超时返回。 @sec 秒 , @nsec 纳秒
int  NetModSocket::sub_timeout( void *topic, int size, int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.sub_timeout((char *)topic,  size,  key, &timeout);
  return shmModSocket.sub((char *)topic,  size,  key, &timeout, BUS_TIMEOUT_FLAG);
}
int  NetModSocket::sub_nowait( void *topic, int size, int key){
  return shmModSocket.sub_nowait((char *)topic,  size,  key);
  return shmModSocket.sub((char *)topic,  size,  key, NULL, BUS_NOWAIT_FLAG);
}
@@ -612,10 +545,10 @@
// 超时返回。 @sec 秒 , @nsec 纳秒
int  NetModSocket::desub_timeout( void *topic, int size, int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.desub_timeout((char *)topic,  size,  key, &timeout);
  return shmModSocket.desub((char *)topic,  size,  key, &timeout, BUS_TIMEOUT_FLAG);
}
int  NetModSocket::desub_nowait( void *topic, int size, int key){
  return shmModSocket.desub_nowait((char *)topic,  size,  key);
  return shmModSocket.desub((char *)topic,  size,  key, NULL, BUS_NOWAIT_FLAG);
}
@@ -632,10 +565,10 @@
//  超时返回。 @sec 秒 , @nsec 纳秒
int  NetModSocket::pub_timeout( char *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.pub_timeout(topic, topic_size, content, content_size, key, &timeout);
  return shmModSocket.pub(topic, topic_size, content, content_size, key, &timeout, BUS_TIMEOUT_FLAG);
}
int  NetModSocket::pub_nowait( char *topic, int topic_size, void *content, int content_size, int key){
  return shmModSocket.pub_nowait(topic, topic_size, content, content_size, key);
  return shmModSocket.pub(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
}