From 8284df1d749fa7adb334fe4f43da77bfc9c05a71 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 24 十二月 2020 11:35:02 +0800
Subject: [PATCH] add error message method

---
 src/socket/net_mod_socket.c |  101 +++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 74 insertions(+), 27 deletions(-)

diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c
index 675c80b..fb5003e 100644
--- a/src/socket/net_mod_socket.c
+++ b/src/socket/net_mod_socket.c
@@ -85,16 +85,16 @@
 
   int i, n, recv_size, connfd;
   net_node_t *node;
-  void *recv_buf;
+  void *recv_buf = NULL;
+  struct timespec timeout;
+  int ret;
+  int n_req = 0, n_recv_suc = 0, n_resp =0;
   
   net_mod_request_head_t request_head = {};
- 
-  int n_req = 0, n_recv_suc = 0, n_resp =0;
-
    
   net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
 
-  int ret;
+ 
   NetConnPool *mpool;
 
   /* Make first caller allocate key for thread-specific data */
@@ -131,13 +131,25 @@
     node = &node_arr[i];
     if(node->host == NULL || strcmp(node->host, "") == 0 ) {
       // 鏈湴鍙戦��
-      shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
-      strcpy( ret_arr[n_recv_suc].host,"");
-      ret_arr[n_recv_suc].port = 0;
-      ret_arr[n_recv_suc].key = node->key;
-      ret_arr[n_recv_suc].content = recv_buf;
-      ret_arr[n_recv_suc].content_length = recv_size;
-      n_recv_suc++;
+     
+      if(msec == 0) {
+        ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size);
+      } else if(msec > 0){
+        timeout.tv_sec = msec / 1000;
+        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
+        ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout);
+      } else {
+        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
+      }
+      if( ret == 0) {
+        strcpy( ret_arr[n_recv_suc].host,"");
+        ret_arr[n_recv_suc].port = 0;
+        ret_arr[n_recv_suc].key = node->key;
+        ret_arr[n_recv_suc].content = recv_buf;
+        ret_arr[n_recv_suc].content_length = recv_size;
+        n_recv_suc++;
+      }
+     
       continue;
     }
 
@@ -227,13 +239,29 @@
 
   mpool->maxi = -1;
 
-  *recv_arr = ret_arr;
+  if(recv_arr != NULL) {
+    *recv_arr = ret_arr;
+  } else {
+    free_recv_msg_arr(ret_arr, n_recv_suc);
+  }
+  
   if(recv_arr_size != NULL) {
     *recv_arr_size = n_recv_suc;
   }
   return n_recv_suc;
      
 }
+
+
+void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
+
+  for(int i =0; i< size; i++) {
+    if(arr[i].content != NULL)
+      free(arr[i].content);
+  }
+  free(arr);
+}
+
 
 int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
   return _pub_(node_arr, arrlen, topic, topic_size, content,   content_size, -1);
@@ -251,9 +279,10 @@
 // int  pub(char *topic, int topic_size, void *content, int content_size, int port);
 
 int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content,
- int content_size, int  timeout) {
+ int content_size, int  msec) {
   int i, connfd;
   net_node_t *node;
+  struct timespec timeout;
  
   net_mod_request_head_t request_head;
   net_mod_recv_msg_t recv_msg;
@@ -287,15 +316,41 @@
     }
   }
 
-  
+  // 鏈湴鍙戦��
+  if(node_arr == NULL || arrlen == 0) {
+    if(msec == 0) {
+      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
+    } else if(msec > 0) {
+      timeout.tv_sec = msec / 1000;
+      timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
+      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
+    } else {
+      ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
+    }
+    if(ret == 0 ) {
+      n_pub_suc++;
+    }
+  }
+
   for (i = 0; i < arrlen; i++) {
 
     node = &node_arr[i];
     if(node->host == NULL) {
       // 鏈湴鍙戦��
-      if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) {
-         n_pub_suc++;
+      if(msec == 0) {
+        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
+      } else if(msec > 0) {
+        timeout.tv_sec = msec / 1000;
+        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
+        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
+      } else {
+        ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
       }
+
+      if(ret == 0 ) {
+        n_pub_suc++;
+      }
+      
      
     } else {
       sprintf(portstr, "%d", node->port);
@@ -307,7 +362,7 @@
       request_head.key = node->key;
       request_head.content_length = content_size;
       request_head.topic_length = strlen(topic) + 1;
-      request_head.timeout = timeout;
+      request_head.timeout = msec;
 
       if(write_request(connfd, request_head, content, content_size, topic, request_head.topic_length) != 0) {
         LoggerFactory::getLogger()->error(" NetModSocket::_pub_ write_request failture %s:%d\n", node->host, node->port);
@@ -322,7 +377,7 @@
   while(n_resp < n_req)
   {
     /* Wait for listening/connected descriptor(s) to become ready */
-    if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, timeout) ) <= 0) {
+    if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, msec) ) <= 0) {
        // wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂�
       break;
     }
@@ -625,14 +680,6 @@
   return shmModSocket.get_key();
 }
 
-
-void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
-
-  for(int i =0; i< size; i++) {
-    free(arr[i].content);
-  }
-  free(arr);
-}
 
 
 

--
Gitblit v1.8.0