wangzhengquan
2020-12-24 8284df1d749fa7adb334fe4f43da77bfc9c05a71
src/socket/net_mod_socket.c
@@ -85,16 +85,16 @@
  int i, n, recv_size, connfd;
  net_node_t *node;
  void *recv_buf;
  void *recv_buf = NULL;
  struct timespec timeout;
  int ret;
  int n_req = 0, n_recv_suc = 0, n_resp =0;
  
  net_mod_request_head_t request_head = {};
  int n_req = 0, n_recv_suc = 0, n_resp =0;
   
  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
  int ret;
  NetConnPool *mpool;
  /* Make first caller allocate key for thread-specific data */
@@ -107,8 +107,8 @@
  mpool = (NetConnPool *)pthread_getspecific(poolKey);
  if (mpool == NULL)
  {
    /* If first call from this thread, allocate
                                   buffer for thread, and save its location */
    /* If first call from this thread, allocate buffer for thread, and save its location */
    logger->debug("Create connPool");
    mpool = new NetConnPool();
    if (mpool == NULL) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ malloc");
@@ -129,15 +129,27 @@
  for (i = 0; i< arrlen; i++) {
    node = &node_arr[i];
    if(node->host == NULL) {
    if(node->host == NULL || strcmp(node->host, "") == 0 ) {
      // 本地发送
      shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
      strcpy( ret_arr[n_recv_suc].host,"localshm");
      ret_arr[n_recv_suc].port = 0;
      ret_arr[n_recv_suc].key = node->key;
      ret_arr[n_recv_suc].content = recv_buf;
      ret_arr[n_recv_suc].content_length = recv_size;
      n_recv_suc++;
      if(msec == 0) {
        ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size);
      } else if(msec > 0){
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout);
      } else {
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
      }
      if( ret == 0) {
        strcpy( ret_arr[n_recv_suc].host,"");
        ret_arr[n_recv_suc].port = 0;
        ret_arr[n_recv_suc].key = node->key;
        ret_arr[n_recv_suc].content = recv_buf;
        ret_arr[n_recv_suc].content_length = recv_size;
        n_recv_suc++;
      }
      continue;
    }
@@ -227,13 +239,29 @@
  mpool->maxi = -1;
  *recv_arr = ret_arr;
  if(recv_arr != NULL) {
    *recv_arr = ret_arr;
  } else {
    free_recv_msg_arr(ret_arr, n_recv_suc);
  }
  if(recv_arr_size != NULL) {
    *recv_arr_size = n_recv_suc;
  }
  return n_recv_suc;
     
}
void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
  for(int i =0; i< size; i++) {
    if(arr[i].content != NULL)
      free(arr[i].content);
  }
  free(arr);
}
int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
  return _pub_(node_arr, arrlen, topic, topic_size, content,   content_size, -1);
@@ -251,9 +279,10 @@
// int  pub(char *topic, int topic_size, void *content, int content_size, int port);
int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content,
 int content_size, int  timeout) {
 int content_size, int  msec) {
  int i, connfd;
  net_node_t *node;
  struct timespec timeout;
 
  net_mod_request_head_t request_head;
  net_mod_recv_msg_t recv_msg;
@@ -287,15 +316,41 @@
    }
  }
  // 本地发送
  if(node_arr == NULL || arrlen == 0) {
    if(msec == 0) {
      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
    } else if(msec > 0) {
      timeout.tv_sec = msec / 1000;
      timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
    } else {
      ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
    }
    if(ret == 0 ) {
      n_pub_suc++;
    }
  }
  for (i = 0; i < arrlen; i++) {
    node = &node_arr[i];
    if(node->host == NULL) {
      // 本地发送
      if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) {
         n_pub_suc++;
      if(msec == 0) {
        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
      } else if(msec > 0) {
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
      } else {
        ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
      }
      if(ret == 0 ) {
        n_pub_suc++;
      }
     
    } else {
      sprintf(portstr, "%d", node->port);
@@ -307,7 +362,7 @@
      request_head.key = node->key;
      request_head.content_length = content_size;
      request_head.topic_length = strlen(topic) + 1;
      request_head.timeout = timeout;
      request_head.timeout = msec;
      if(write_request(connfd, request_head, content, content_size, topic, request_head.topic_length) != 0) {
        LoggerFactory::getLogger()->error(" NetModSocket::_pub_ write_request failture %s:%d\n", node->host, node->port);
@@ -322,7 +377,7 @@
  while(n_resp < n_req)
  {
    /* Wait for listening/connected descriptor(s) to become ready */
    if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, timeout) ) <= 0) {
    if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, msec) ) <= 0) {
       // wirite_set 和 read_set 在指定时间内都没准备好
      break;
    }
@@ -560,15 +615,6 @@
/**
 * 启动bus
 *
 * @return 0 成功, 其他值 失败的错误码
*/
int  NetModSocket::start_bus() {
  return shmModSocket.start_bus();
}
/**
 * 订阅指定主题
 * @topic 主题
 * @size 主题长度
@@ -634,14 +680,6 @@
  return shmModSocket.get_key();
}
void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
  for(int i =0; i< size; i++) {
    free(arr[i].content);
  }
  free(arr);
}