From 8284df1d749fa7adb334fe4f43da77bfc9c05a71 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 24 十二月 2020 11:35:02 +0800 Subject: [PATCH] add error message method --- src/socket/net_mod_socket.c | 95 ++++++++++++++++++++++++++++++++++------------- 1 files changed, 68 insertions(+), 27 deletions(-) diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c index 1e1fc27..fb5003e 100644 --- a/src/socket/net_mod_socket.c +++ b/src/socket/net_mod_socket.c @@ -85,16 +85,16 @@ int i, n, recv_size, connfd; net_node_t *node; - void *recv_buf; + void *recv_buf = NULL; + struct timespec timeout; + int ret; + int n_req = 0, n_recv_suc = 0, n_resp =0; net_mod_request_head_t request_head = {}; - - int n_req = 0, n_recv_suc = 0, n_resp =0; - net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t)); - int ret; + NetConnPool *mpool; /* Make first caller allocate key for thread-specific data */ @@ -131,13 +131,25 @@ node = &node_arr[i]; if(node->host == NULL || strcmp(node->host, "") == 0 ) { // 鏈湴鍙戦�� - shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); - strcpy( ret_arr[n_recv_suc].host,""); - ret_arr[n_recv_suc].port = 0; - ret_arr[n_recv_suc].key = node->key; - ret_arr[n_recv_suc].content = recv_buf; - ret_arr[n_recv_suc].content_length = recv_size; - n_recv_suc++; + + if(msec == 0) { + ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size); + } else if(msec > 0){ + timeout.tv_sec = msec / 1000; + timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; + ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout); + } else { + ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); + } + if( ret == 0) { + strcpy( ret_arr[n_recv_suc].host,""); + ret_arr[n_recv_suc].port = 0; + ret_arr[n_recv_suc].key = node->key; + ret_arr[n_recv_suc].content = recv_buf; + ret_arr[n_recv_suc].content_length = recv_size; + n_recv_suc++; + } + continue; } @@ -227,13 +239,29 @@ mpool->maxi = -1; - *recv_arr = ret_arr; + if(recv_arr != NULL) { + *recv_arr = ret_arr; + } else { + free_recv_msg_arr(ret_arr, n_recv_suc); + } + if(recv_arr_size != NULL) { *recv_arr_size = n_recv_suc; } return n_recv_suc; } + + +void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) { + + for(int i =0; i< size; i++) { + if(arr[i].content != NULL) + free(arr[i].content); + } + free(arr); +} + int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) { return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, -1); @@ -251,9 +279,10 @@ // int pub(char *topic, int topic_size, void *content, int content_size, int port); int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, - int content_size, int timeout) { + int content_size, int msec) { int i, connfd; net_node_t *node; + struct timespec timeout; net_mod_request_head_t request_head; net_mod_recv_msg_t recv_msg; @@ -289,7 +318,16 @@ // 鏈湴鍙戦�� if(node_arr == NULL || arrlen == 0) { - if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) { + if(msec == 0) { + ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key); + } else if(msec > 0) { + timeout.tv_sec = msec / 1000; + timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; + ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout); + } else { + ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key); + } + if(ret == 0 ) { n_pub_suc++; } } @@ -299,9 +337,20 @@ node = &node_arr[i]; if(node->host == NULL) { // 鏈湴鍙戦�� - if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) { - n_pub_suc++; + if(msec == 0) { + ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key); + } else if(msec > 0) { + timeout.tv_sec = msec / 1000; + timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; + ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout); + } else { + ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key); } + + if(ret == 0 ) { + n_pub_suc++; + } + } else { sprintf(portstr, "%d", node->port); @@ -313,7 +362,7 @@ request_head.key = node->key; request_head.content_length = content_size; request_head.topic_length = strlen(topic) + 1; - request_head.timeout = timeout; + request_head.timeout = msec; if(write_request(connfd, request_head, content, content_size, topic, request_head.topic_length) != 0) { LoggerFactory::getLogger()->error(" NetModSocket::_pub_ write_request failture %s:%d\n", node->host, node->port); @@ -328,7 +377,7 @@ while(n_resp < n_req) { /* Wait for listening/connected descriptor(s) to become ready */ - if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, timeout) ) <= 0) { + if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, msec) ) <= 0) { // wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂� break; } @@ -631,14 +680,6 @@ return shmModSocket.get_key(); } - -void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) { - - for(int i =0; i< size; i++) { - free(arr[i].content); - } - free(arr); -} -- Gitblit v1.8.0