wangzhengquan
2021-01-14 124011574583b980c1794a1de4e61b39f73f1f11
src/socket/net_mod_socket.cpp
@@ -1,7 +1,7 @@
#include "net_mod_socket.h"
#include "socket_io.h"
#include "net_mod_socket_io.h"
#include "net_conn_pool.h"
#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>
#include <pthread.h>
@@ -15,13 +15,37 @@
NetModSocket::NetModSocket() 
{
  int s;
  if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
      logger->error(errno, "NetModSocket::NetModSocket signal");
  gpool = new NetConnPool();
  pthread_mutexattr_t mtxAttr;
  s = pthread_mutexattr_init(&mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_init");
  s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_settype");
  s = pthread_mutex_init(&sendMutex, &mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutex_init");
  s = pthread_mutexattr_destroy(&mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_destroy");
}
NetModSocket::~NetModSocket() {
  int s;
  delete gpool;
  s =  pthread_mutex_destroy(&sendMutex);
  if(s != 0) {
    err_exit(s, "shm_close_socket");
  }
}
@@ -43,15 +67,15 @@
int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
  _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
}
int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec) {
  _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
}
int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
   _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0);
  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0);
}
@@ -80,23 +104,10 @@
  }
}
int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
NetConnPool* NetModSocket::_get_threadlocal_pool() {
  int i, n, recv_size, connfd;
  net_node_t *node;
  void *recv_buf = NULL;
  struct timespec timeout;
  int ret;
  int n_req = 0, n_recv_suc = 0, n_resp =0;
  net_mod_request_head_t request_head = {};
  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
  NetConnPool *mpool;
  /* Make first caller allocate key for thread-specific data */
  ret = pthread_once(&once, _createConnPoolKey_);
  if (ret != 0) {
@@ -115,16 +126,37 @@
      exit(1);
    }
    ret = pthread_setspecific(poolKey, mpool);
    if (ret != 0) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_setspecific");
      exit(1);
    }
  }
  return mpool;
}
NetConnPool* NetModSocket::_get_pool() {
  return _get_threadlocal_pool();
}
int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
  int i, n, recv_size, connfd;
  net_node_t *node;
  void *recv_buf = NULL;
  struct timespec timeout;
  int ret;
  int n_req = 0, n_recv_suc = 0, n_resp =0;
  
  net_mod_request_head_t request_head = {};
  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
 
  NetConnPool *mpool = _get_pool();
  for (i = 0; i< arrlen; i++) {
@@ -142,7 +174,7 @@
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
      }
      if( ret == 0) {
        strcpy( ret_arr[n_recv_suc].host,"");
        strcpy( ret_arr[n_recv_suc].host, "");
        ret_arr[n_recv_suc].port = 0;
        ret_arr[n_recv_suc].key = node->key;
        ret_arr[n_recv_suc].content = recv_buf;
@@ -290,42 +322,18 @@
  int n_req = 0, n_pub_suc = 0, n_resp = 0;
  int ret;
  NetConnPool *mpool;
  /* Make first caller allocate key for thread-specific data */
  ret = pthread_once(&once, _createConnPoolKey_);
  if (ret != 0) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_once");
    exit(1);
  }
  mpool = (NetConnPool *)pthread_getspecific(poolKey);
  if (mpool == NULL)
  {
    /* If first call from this thread, allocte buffer for thread, and save its location */
    mpool = new NetConnPool();
    if (mpool == NULL) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ malloc");
      exit(1);
    }
    ret = pthread_setspecific(poolKey, mpool);
    if (ret != 0) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_setspecific");
      exit(1);
    }
  }
  NetConnPool *mpool = _get_pool();
  // 本地发送
  if(node_arr == NULL || arrlen == 0) {
    if(msec == 0) {
      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
    } else if(msec > 0) {
      timeout.tv_sec = msec / 1000;
      timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
    } else {
      ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
    }
    if(ret == 0 ) {
      n_pub_suc++;
@@ -338,13 +346,13 @@
    if(node->host == NULL) {
      // 本地发送
      if(msec == 0) {
        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
      } else if(msec > 0) {
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
      } else {
        ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
      }
      if(ret == 0 ) {