wangzhengquan
2021-01-14 543e82effca8836d0730ed69fcf46fbc09b6e512
src/socket/net_mod_socket.cpp
@@ -67,15 +67,15 @@
int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
  _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
}
int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec) {
  _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
}
int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
   _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0);
  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0);
}
@@ -137,24 +137,11 @@
}
NetConnPool* NetModSocket::_get_pool() {
  return gpool;
  return _get_threadlocal_pool();
}
int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
  int s, rv;
  if ((s = pthread_mutex_lock(&sendMutex)) != 0)
    err_exit(s, "NetModSocket : pthread_mutex_lock");
  rv = _sendandrecv_unsafe(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size,   msec );
  if ((s = pthread_mutex_unlock(&sendMutex)) != 0)
    err_exit(s, "NetModSocket : pthread_mutex_lock");
  return rv;
}
int NetModSocket::_sendandrecv_unsafe(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
  int i, n, recv_size, connfd;
@@ -340,13 +327,13 @@
  // 本地发送
  if(node_arr == NULL || arrlen == 0) {
    if(msec == 0) {
      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
    } else if(msec > 0) {
      timeout.tv_sec = msec / 1000;
      timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
    } else {
      ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
    }
    if(ret == 0 ) {
      n_pub_suc++;
@@ -359,13 +346,13 @@
    if(node->host == NULL) {
      // 本地发送
      if(msec == 0) {
        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
      } else if(msec > 0) {
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
      } else {
        ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
      }
      if(ret == 0 ) {