From 42d41eafe863d5286251eb49c908074a7e015f37 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 01 三月 2021 18:08:56 +0800 Subject: [PATCH] update --- src/net/net_mod_socket.cpp | 109 +++++++++++++++++++++++++++++++++++------------------- 1 files changed, 70 insertions(+), 39 deletions(-) diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp index 2932d8c..6a541d6 100644 --- a/src/net/net_mod_socket.cpp +++ b/src/net/net_mod_socket.cpp @@ -46,19 +46,19 @@ return shmModSocket.force_bind(key); } -int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { - return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1); -} -int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec) { - return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec); -} -int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { - return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0); +// int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, +// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { +// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1); +// } +// int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, +// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec) { +// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec); +// } +// int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, +// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { +// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0); -} +// } /* Free thread-specific data buffer */ @@ -124,21 +124,22 @@ -int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec ) { +int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, + net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, + net_mod_err_t ** _err_arr, int *_err_arr_size, int msec ) { int i, recv_size, connfd; net_node_t *node; void *recv_buf = NULL; struct timespec timeout; int ret; - int n_req = 0, n_recv_suc = 0, n_recv_err = 0, n_resp =0; + int n_req = 0, n_recv_suc = 0, n_err = 0, n_resp =0; net_mod_request_head_t request_head = {}; net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t)); - net_mod_recv_err_t err_arr[arrlen]; + net_mod_err_t *err_arr = (net_mod_err_t *)calloc(arrlen, sizeof(net_mod_err_t)); NetConnPool *mpool = _get_pool(); @@ -166,11 +167,11 @@ ret_arr[n_recv_suc].content_length = recv_size; n_recv_suc++; } else { - err_arr[n_recv_err].port = 0; - err_arr[n_recv_err].key = node->key; - err_arr[n_recv_err].code = ret; - n_recv_err++; - logger->error("NetModSocket:: _sendandrecv_ to key %d failed. %s", node->key, bus_strerror(ret)); + err_arr[n_err].port = 0; + err_arr[n_err].key = node->key; + err_arr[n_err].code = ret; + n_err++; + // logger->error("NetModSocket:: _sendandrecv_ to key %d failed. %s", node->key, bus_strerror(ret)); } @@ -178,11 +179,11 @@ } if( (connfd = mpool->getConn(node->host, node->port)) < 0 ) { - memcpy(err_arr[n_recv_err].host, node->host, sizeof(err_arr[n_recv_err].host)); - err_arr[n_recv_err].port = node->port; - err_arr[n_recv_err].key = node->key; - err_arr[n_recv_err].code = EBUS_NET; - n_recv_err++; + memcpy(err_arr[n_err].host, node->host, sizeof(err_arr[n_err].host)); + err_arr[n_err].port = node->port; + err_arr[n_err].key = node->key; + err_arr[n_err].code = EBUS_NET; + n_err++; continue; } @@ -197,11 +198,11 @@ // printf("write_request %s:%d\n", request_head.host, request_head.port); if(write_request(connfd, request_head, send_buf, send_size, NULL, 0) != 0) { LoggerFactory::getLogger()->error("write_request failture %s:%d\n", node->host, node->port); - memcpy(err_arr[n_recv_err].host, node->host, sizeof(err_arr[n_recv_err].host)); - err_arr[n_recv_err].port = node->port; - err_arr[n_recv_err].key = node->key; - err_arr[n_recv_err].code = EBUS_NET; - n_recv_err++; + memcpy(err_arr[n_err].host, node->host, sizeof(err_arr[n_err].host)); + err_arr[n_err].port = node->port; + err_arr[n_err].key = node->key; + err_arr[n_err].code = EBUS_NET; + n_err++; mpool->closeConn( connfd); } else { n_req++; @@ -227,7 +228,7 @@ { mpool->nready--; // printf("POLLIN %d\n", connfd); - if( (ret = read_response(connfd, ret_arr+n_recv_suc, err_arr + n_recv_err)) == 0) { + if( (ret = read_response(connfd, ret_arr+n_recv_suc, err_arr + n_err)) == 0) { n_recv_suc++; // 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅 mpool->conns[i].fd = -1; @@ -238,14 +239,14 @@ logger->error("NetModSocket::_sendandrecv_ read_response key = %d , %s", get_key(), bus_strerror(ret)); mpool->closeConn( connfd); - n_recv_err++; + n_err++; // mpool->conns[i].fd = -1; } else { // 浠g悊鏈嶅姟娌℃湁杞彂鎴愬姛 logger->error("NetModSocket::_sendandrecv_ read_response key = %d , %s", get_key(), bus_strerror(ret)); mpool->conns[i].fd = -1; - n_recv_err++; + n_err++; } n_resp++; @@ -277,14 +278,43 @@ mpool->maxi = -1; + if(recv_arr != NULL) { - *recv_arr = ret_arr; + + if(n_recv_suc > 0) { + *recv_arr = ret_arr; + + } else { + free_recv_msg_arr(ret_arr, n_recv_suc); + } + } else { free_recv_msg_arr(ret_arr, n_recv_suc); } - + if(recv_arr_size != NULL) { - *recv_arr_size = n_recv_suc; + *recv_arr_size = n_recv_suc; + } + + + if(_err_arr != NULL) { + + if(n_err > 0) { + *_err_arr = err_arr; + + + } else { + *_err_arr = NULL; + *_err_arr_size = 0; + free(err_arr); + } + + } else { + free(err_arr); + } + + if(_err_arr_size != NULL) { + *_err_arr_size = n_err; } return n_recv_suc; @@ -330,7 +360,7 @@ int ret; NetConnPool *mpool = _get_pool(); - net_mod_recv_err_t err_msg; + net_mod_err_t err_msg; // 鏈湴鍙戦�� if(node_arr == NULL || arrlen == 0) { @@ -636,7 +666,7 @@ * @return 0 鎴愬姛, EBUS_NET 缃戠粶閿欒锛� 鍏朵粬鍊� 浠g悊鏈嶅姟娌℃湁杞彂鎴愬姛銆� * */ -int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg, net_mod_recv_err_t *err_arr) { +int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg, net_mod_err_t *err_arr) { int recv_size; void *recv_buf; char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH]; @@ -667,6 +697,7 @@ LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc recv_buf"); exit(1); } + if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) != response_head.content_length) { memcpy(err_arr->host, response_head.host, sizeof(err_arr->host)); -- Gitblit v1.8.0