wangzhengquan
2021-02-04 83956b12d863924936a98c9dfbece37feb0cce9c
src/net/net_mod_socket.cpp
@@ -17,32 +17,32 @@
{
  int s;
  if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
      logger->error(errno, "NetModSocket::NetModSocket signal");
    logger->error(errno, "NetModSocket::NetModSocket signal");
  gpool = new NetConnPool();
  // gpool = new NetConnPool();
  pthread_mutexattr_t mtxAttr;
  s = pthread_mutexattr_init(&mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_init");
  s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_settype");
  s = pthread_mutex_init(&sendMutex, &mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutex_init");
  // pthread_mutexattr_t mtxAttr;
  // s = pthread_mutexattr_init(&mtxAttr);
  // if (s != 0)
  //   err_exit(s, "pthread_mutexattr_init");
  // s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK);
  // if (s != 0)
  //   err_exit(s, "pthread_mutexattr_settype");
  // s = pthread_mutex_init(&sendMutex, &mtxAttr);
  // if (s != 0)
  //   err_exit(s, "pthread_mutex_init");
  s = pthread_mutexattr_destroy(&mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_destroy");
  // s = pthread_mutexattr_destroy(&mtxAttr);
  // if (s != 0)
  //   err_exit(s, "pthread_mutexattr_destroy");
}
NetModSocket::~NetModSocket() {
  int s;
  delete gpool;
  s =  pthread_mutex_destroy(&sendMutex);
  // delete gpool;
  // s =  pthread_mutex_destroy(&sendMutex);
  if(s != 0) {
    err_exit(s, "shm_close_socket");
  }
@@ -141,6 +141,8 @@
}
int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
@@ -165,11 +167,11 @@
      // 本地发送
     
      if(msec == 0) {
        ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size);
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG);
      } else if(msec > 0){
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout);
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG);
      } else {
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
      }
@@ -334,11 +336,11 @@
  // 本地发送
  if(node_arr == NULL || arrlen == 0) {
    if(msec == 0) {
      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY,  NULL, BUS_NOWAIT_FLAG);
    } else if(msec > 0) {
      timeout.tv_sec = msec / 1000;
      timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
    } else {
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
    }
@@ -354,11 +356,11 @@
    if(node->host == NULL) {
      // 本地发送
      if(msec == 0) {
        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG);
      } else if(msec > 0) {
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
      } else {
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
      }
@@ -457,55 +459,20 @@
 * @return 0 成功, 其他值 失败的错误码
 */
int NetModSocket::sendto(const void *buf, const int size, const int key){
  int rv = shmModSocket.sendto(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto: %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto : %d sendto  %d failed", get_key(), key);
  }
  return rv;
  return shmModSocket.sendto(buf, size, key);
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  int rv = shmModSocket.sendto_timeout(buf, size, key, &timeout);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto_timeout: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto_timeout : %d sendto  %d failed %s", get_key(),  key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto_timeout:  %d sendto  %d failed", get_key(),  key);
  }
  return rv;
  return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
}
// 发送信息立刻返回。
int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){
  int rv = shmModSocket.sendto_nowait(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto_nowait: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto_nowait %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto_nowait %d sendto  %d failed", get_key(), key);
  }
  return rv;
  return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
/**
@@ -515,55 +482,28 @@
*/
int NetModSocket::recvfrom(void **buf, int *size, int *key) {
  logger->debug(" %d NetModSocket::recvfrom before", get_key());
  int rv = shmModSocket.recvfrom(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    logger->debug("NetModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom: socket %d recvfrom failed",  get_key());
  }
  return rv;
  return shmModSocket.recvfrom(buf, size, key);
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom_timeout:  %d recvfrom failed",  get_key());
  }
  return rv;
  return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
}
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
  int rv = shmModSocket.recvfrom_nowait(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom_nowait:  %d recvfrom failed",  get_key());
  }
  return rv;
  return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
int NetModSocket::recvandsend(recvandsend_callback_fn callback,
                              const struct timespec *timeout , int flag, void * user_data ) {
  return shmModSocket.recvandsend(callback, timeout, flag, user_data);
}
/**
 * 发送请求信息并等待接收应答
@@ -576,10 +516,10 @@
// 超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::sendandrecv_timeout( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.sendandrecv_timeout(send_buf, send_size, key, recv_buf, recv_size, &timeout);
  return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, &timeout, BUS_TIMEOUT_FLAG);
}
int NetModSocket::sendandrecv_nowait( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) {
  return shmModSocket.sendandrecv_nowait(send_buf, send_size, key, recv_buf, recv_size);
  return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, NULL, BUS_NOWAIT_FLAG);
}
@@ -595,10 +535,10 @@
// 超时返回。 @sec 秒 , @nsec 纳秒
int  NetModSocket::sub_timeout( void *topic, int size, int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.sub_timeout((char *)topic,  size,  key, &timeout);
  return shmModSocket.sub((char *)topic,  size,  key, &timeout, BUS_TIMEOUT_FLAG);
}
int  NetModSocket::sub_nowait( void *topic, int size, int key){
  return shmModSocket.sub_nowait((char *)topic,  size,  key);
  return shmModSocket.sub((char *)topic,  size,  key, NULL, BUS_NOWAIT_FLAG);
}
@@ -615,10 +555,10 @@
// 超时返回。 @sec 秒 , @nsec 纳秒
int  NetModSocket::desub_timeout( void *topic, int size, int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.desub_timeout((char *)topic,  size,  key, &timeout);
  return shmModSocket.desub((char *)topic,  size,  key, &timeout, BUS_TIMEOUT_FLAG);
}
int  NetModSocket::desub_nowait( void *topic, int size, int key){
  return shmModSocket.desub_nowait((char *)topic,  size,  key);
  return shmModSocket.desub((char *)topic,  size,  key, NULL, BUS_NOWAIT_FLAG);
}
@@ -635,10 +575,10 @@
//  超时返回。 @sec 秒 , @nsec 纳秒
int  NetModSocket::pub_timeout( char *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.pub_timeout(topic, topic_size, content, content_size, key, &timeout);
  return shmModSocket.pub(topic, topic_size, content, content_size, key, &timeout, BUS_TIMEOUT_FLAG);
}
int  NetModSocket::pub_nowait( char *topic, int topic_size, void *content, int content_size, int key){
  return shmModSocket.pub_nowait(topic, topic_size, content, content_size, key);
  return shmModSocket.pub(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
}