From 42d41eafe863d5286251eb49c908074a7e015f37 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 01 三月 2021 18:08:56 +0800 Subject: [PATCH] update --- src/net/net_mod_socket_wrapper.cpp | 15 + src/net/net_mod_socket_wrapper.h | 53 +++-- test_net_socket/shm_util.cpp | 204 ++++++++++++++++--------- src/net/net_mod_socket.h | 37 +--- src/CMakeLists.txt | 1 CMakeLists.txt | 2 src/net/net_mod_socket.cpp | 109 ++++++++---- 7 files changed, 248 insertions(+), 173 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0b653e7..1ad483f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29,6 +29,6 @@ add_subdirectory(${PROJECT_SOURCE_DIR}/src) add_subdirectory(${PROJECT_SOURCE_DIR}/test) add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket) - add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket) +# add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket) # add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util) endif() diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 51184f4..407ac67 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -53,6 +53,7 @@ target_link_libraries(shm_queue PUBLIC ${EXTRA_LIBS} ) +# generate md5 if (BUILD_SHARED_LIBS) add_custom_command( OUTPUT ${PROJECT_BINARY_DIR}/lib/libshm_queue.so.md5 diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp index 2932d8c..6a541d6 100644 --- a/src/net/net_mod_socket.cpp +++ b/src/net/net_mod_socket.cpp @@ -46,19 +46,19 @@ return shmModSocket.force_bind(key); } -int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { - return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1); -} -int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec) { - return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec); -} -int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { - return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0); +// int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, +// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { +// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1); +// } +// int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, +// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec) { +// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec); +// } +// int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, +// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { +// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0); -} +// } /* Free thread-specific data buffer */ @@ -124,21 +124,22 @@ -int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec ) { +int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, + net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, + net_mod_err_t ** _err_arr, int *_err_arr_size, int msec ) { int i, recv_size, connfd; net_node_t *node; void *recv_buf = NULL; struct timespec timeout; int ret; - int n_req = 0, n_recv_suc = 0, n_recv_err = 0, n_resp =0; + int n_req = 0, n_recv_suc = 0, n_err = 0, n_resp =0; net_mod_request_head_t request_head = {}; net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t)); - net_mod_recv_err_t err_arr[arrlen]; + net_mod_err_t *err_arr = (net_mod_err_t *)calloc(arrlen, sizeof(net_mod_err_t)); NetConnPool *mpool = _get_pool(); @@ -166,11 +167,11 @@ ret_arr[n_recv_suc].content_length = recv_size; n_recv_suc++; } else { - err_arr[n_recv_err].port = 0; - err_arr[n_recv_err].key = node->key; - err_arr[n_recv_err].code = ret; - n_recv_err++; - logger->error("NetModSocket:: _sendandrecv_ to key %d failed. %s", node->key, bus_strerror(ret)); + err_arr[n_err].port = 0; + err_arr[n_err].key = node->key; + err_arr[n_err].code = ret; + n_err++; + // logger->error("NetModSocket:: _sendandrecv_ to key %d failed. %s", node->key, bus_strerror(ret)); } @@ -178,11 +179,11 @@ } if( (connfd = mpool->getConn(node->host, node->port)) < 0 ) { - memcpy(err_arr[n_recv_err].host, node->host, sizeof(err_arr[n_recv_err].host)); - err_arr[n_recv_err].port = node->port; - err_arr[n_recv_err].key = node->key; - err_arr[n_recv_err].code = EBUS_NET; - n_recv_err++; + memcpy(err_arr[n_err].host, node->host, sizeof(err_arr[n_err].host)); + err_arr[n_err].port = node->port; + err_arr[n_err].key = node->key; + err_arr[n_err].code = EBUS_NET; + n_err++; continue; } @@ -197,11 +198,11 @@ // printf("write_request %s:%d\n", request_head.host, request_head.port); if(write_request(connfd, request_head, send_buf, send_size, NULL, 0) != 0) { LoggerFactory::getLogger()->error("write_request failture %s:%d\n", node->host, node->port); - memcpy(err_arr[n_recv_err].host, node->host, sizeof(err_arr[n_recv_err].host)); - err_arr[n_recv_err].port = node->port; - err_arr[n_recv_err].key = node->key; - err_arr[n_recv_err].code = EBUS_NET; - n_recv_err++; + memcpy(err_arr[n_err].host, node->host, sizeof(err_arr[n_err].host)); + err_arr[n_err].port = node->port; + err_arr[n_err].key = node->key; + err_arr[n_err].code = EBUS_NET; + n_err++; mpool->closeConn( connfd); } else { n_req++; @@ -227,7 +228,7 @@ { mpool->nready--; // printf("POLLIN %d\n", connfd); - if( (ret = read_response(connfd, ret_arr+n_recv_suc, err_arr + n_recv_err)) == 0) { + if( (ret = read_response(connfd, ret_arr+n_recv_suc, err_arr + n_err)) == 0) { n_recv_suc++; // 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅 mpool->conns[i].fd = -1; @@ -238,14 +239,14 @@ logger->error("NetModSocket::_sendandrecv_ read_response key = %d , %s", get_key(), bus_strerror(ret)); mpool->closeConn( connfd); - n_recv_err++; + n_err++; // mpool->conns[i].fd = -1; } else { // 浠g悊鏈嶅姟娌℃湁杞彂鎴愬姛 logger->error("NetModSocket::_sendandrecv_ read_response key = %d , %s", get_key(), bus_strerror(ret)); mpool->conns[i].fd = -1; - n_recv_err++; + n_err++; } n_resp++; @@ -277,14 +278,43 @@ mpool->maxi = -1; + if(recv_arr != NULL) { - *recv_arr = ret_arr; + + if(n_recv_suc > 0) { + *recv_arr = ret_arr; + + } else { + free_recv_msg_arr(ret_arr, n_recv_suc); + } + } else { free_recv_msg_arr(ret_arr, n_recv_suc); } - + if(recv_arr_size != NULL) { - *recv_arr_size = n_recv_suc; + *recv_arr_size = n_recv_suc; + } + + + if(_err_arr != NULL) { + + if(n_err > 0) { + *_err_arr = err_arr; + + + } else { + *_err_arr = NULL; + *_err_arr_size = 0; + free(err_arr); + } + + } else { + free(err_arr); + } + + if(_err_arr_size != NULL) { + *_err_arr_size = n_err; } return n_recv_suc; @@ -330,7 +360,7 @@ int ret; NetConnPool *mpool = _get_pool(); - net_mod_recv_err_t err_msg; + net_mod_err_t err_msg; // 鏈湴鍙戦�� if(node_arr == NULL || arrlen == 0) { @@ -636,7 +666,7 @@ * @return 0 鎴愬姛, EBUS_NET 缃戠粶閿欒锛� 鍏朵粬鍊� 浠g悊鏈嶅姟娌℃湁杞彂鎴愬姛銆� * */ -int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg, net_mod_recv_err_t *err_arr) { +int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg, net_mod_err_t *err_arr) { int recv_size; void *recv_buf; char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH]; @@ -667,6 +697,7 @@ LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc recv_buf"); exit(1); } + if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) != response_head.content_length) { memcpy(err_arr->host, response_head.host, sizeof(err_arr->host)); diff --git a/src/net/net_mod_socket.h b/src/net/net_mod_socket.h index b02ea6e..6289fa6 100644 --- a/src/net/net_mod_socket.h +++ b/src/net/net_mod_socket.h @@ -55,7 +55,7 @@ }; -struct net_mod_recv_err_t +struct net_mod_err_t { char host[NI_MAXHOST]; int port; @@ -91,12 +91,11 @@ NetConnPool* _get_pool(); //璇诲彇杩斿洖淇℃伅 - int read_response(int clientfd, net_mod_recv_msg_t *recv_msg, net_mod_recv_err_t *err_arr); + int read_response(int clientfd, net_mod_recv_msg_t *recv_msg, net_mod_err_t *err_arr); // 鍙戦�佽姹備俊鎭� int write_request(int clientfd, net_mod_request_head_t &request_head, const void *send_buf, int send_size, const void *topic_buf, int topic_size); - int _sendandrecv_(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout); + int _pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int timeout) ; @@ -121,22 +120,7 @@ - /** - * @brief 濡傛灉寤虹珛杩炴帴鐨勮妭鐐规病鏈夋帴鍙楀埌娑堟伅绛夊緟timeout鐨勬椂闂村悗杩斿洖 - * - * 鍚憂ode_arr 涓殑鎵�鏈夌綉缁滆妭鐐瑰彂閫佽姹傛秷鎭紝鑺傜偣鐨勮繑鍥炰俊鎭眹鎬诲苟瀛樺偍鍦╮ecv_arr涓� - * @node_arr 缃戠粶鑺傜偣缁�, @node_arr_len璇ユ暟缁勯暱搴�.濡傛灉IP涓虹┖鍒欎负鏈湴鍙戦�併�� - * @send_buf 鍙戦�佺殑娑堟伅锛孈send_size 璇ユ秷鎭綋鐨勯暱搴� - * @recv_arr 杩斿洖鐨勫簲绛旀秷鎭粍锛孈recv_arr_size 璇ユ暟缁勯暱搴� - * @timeout 绛夊緟鏃堕棿锛屽崟浣嶆槸鍗冨垎涔嬩竴绉� - * @return 鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁� - * - * 浼樼偣锛�1鏌愪釜鑺傜偣鐨勬晠闅滀笉浼氶樆濉炲叾浠栬妭鐐广��2 鎬ц兘濂姐�� 3 閲囩敤thread local鎶�鏈嵆淇濊瘉浜嗙嚎绋嬪畨鍏紝鍙堝彲浠ヤ娇鐢ㄨ繛鎺ユ睜缂撳瓨杩炴帴 - */ - int sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ; - - + /** * 濡傛灉寤虹珛杩炴帴鐨勮妭鐐规病鏈夋帴鍙楀埌娑堟伅绛夊緟timeout鐨勬椂闂村悗杩斿洖 * 鍚憂ode_arr 涓殑鎵�鏈夌綉缁滆妭鐐瑰彂閫佽姹傛秷鎭紝鑺傜偣鐨勮繑鍥炰俊鎭眹鎬诲苟瀛樺偍鍦╮ecv_arr涓� @@ -147,15 +131,10 @@ * @return 鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁� * 浼樼偣锛�1鏌愪釜鑺傜偣鐨勬晠闅滀笉浼氶樆濉炲叾浠栬妭鐐广��2 鎬ц兘濂姐�� 3 閲囩敤thread local鎶�鏈嵆淇濊瘉浜嗙嚎绋嬪畨鍏紝鍙堝彲浠ヤ娇鐢ㄨ繛鎺ユ睜缂撳瓨杩炴帴 */ - int sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout); - - /** - * 涓嶇瓑寰呯珛鍗宠繑鍥� - * @timeout 绛夊緟鏃堕棿锛屽崟浣嶆槸鍗冨垎涔嬩竴绉� - */ - int sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ; + int sendandrecv(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, + net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, + net_mod_err_t ** _err_arr, int *_err_arr_size, int timeout); + /** * 鍔熻兘鍚宻endandrecv diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp index 6f9a66e..abdcbb7 100644 --- a/src/net/net_mod_socket_wrapper.cpp +++ b/src/net/net_mod_socket_wrapper.cpp @@ -96,9 +96,9 @@ } int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size){ + net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, net_mod_err_t ** err_arr, int *err_arr_size){ NetModSocket *sockt = (NetModSocket *)_socket; - return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); + return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, -1); } /** @@ -106,16 +106,17 @@ * @timeout 绛夊緟鏃堕棿锛屽崟浣嶆槸鍗冨垎涔嬩竴绉� */ int net_mod_socket_sendandrecv_timeout(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout){ + net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, + net_mod_err_t ** err_arr, int *err_arr_size, int timeout){ NetModSocket *sockt = (NetModSocket *)_socket; - // return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); - return sockt->sendandrecv_timeout(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, timeout); + return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, timeout); } int net_mod_socket_sendandrecv_nowait(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { + net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, net_mod_err_t ** err_arr, int *err_arr_size) { + NetModSocket *sockt = (NetModSocket *)_socket; - return sockt->sendandrecv_nowait(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); + return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, 0); } diff --git a/src/net/net_mod_socket_wrapper.h b/src/net/net_mod_socket_wrapper.h index e823cc8..b4941c0 100644 --- a/src/net/net_mod_socket_wrapper.h +++ b/src/net/net_mod_socket_wrapper.h @@ -136,14 +136,17 @@ * @param node_arr_len璇ユ暟缁勯暱搴�.濡傛灉IP涓虹┖鍒欎负鏈湴鍙戦�併�� * @param send_buf 鍙戦�佺殑娑堟伅 * @param send_size 璇ユ秷鎭綋鐨勯暱搴� - * @param recv_arr 杩斿洖鐨勫簲绛旀秷鎭暟缁� + * @param recv_arr 杩斿洖鐨勫簲绛旀秷鎭暟缁勶紝浣跨敤瀹屽悗闇�瑕佽皟鐢╪et_mod_socket_free_recv_msg_arr閲婃斁鎺� * @param recv_arr_size 杩斿洖鐨勫簲绛旀秷鎭暟缁勯暱搴� + * @param err_arr 杩斿洖鍙戦�侀敊璇殑鑺傜偣鏁扮粍锛屼娇鐢ㄥ畬鍚庨渶瑕佽皟鐢╢ree鏂规硶閲婃斁鎺� + 脳 @param err_arr_size 杩斿洖鍙戦�侀敊璇殑鑺傜偣鏁扮粍鐨勯暱搴� * * @return 鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁� * */ int net_mod_socket_sendandrecv(void *_sockt, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ; + net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, + net_mod_err_t ** err_arr, int *err_arr_size) ; /** @@ -154,15 +157,39 @@ * @param node_arr_len璇ユ暟缁勯暱搴�.濡傛灉IP涓虹┖鍒欎负鏈湴鍙戦�併�� * @param send_buf 鍙戦�佺殑娑堟伅 * @param send_size 璇ユ秷鎭綋鐨勯暱搴� - * @param recv_arr 杩斿洖鐨勫簲绛旀秷鎭暟缁� + * @param recv_arr 杩斿洖鐨勫簲绛旀秷鎭暟缁勶紝浣跨敤瀹屽悗闇�瑕佽皟鐢╪et_mod_socket_free_recv_msg_arr閲婃斁鎺� * @param recv_arr_size 杩斿洖鐨勫簲绛旀秷鎭暟缁勯暱搴� + * @param err_arr 杩斿洖鍙戦�侀敊璇殑鑺傜偣鏁扮粍锛屼娇鐢ㄥ畬鍚庨渶瑕佽皟鐢╢ree鏂规硶閲婃斁鎺� + 脳 @param err_arr_size 杩斿洖鍙戦�侀敊璇殑鑺傜偣鏁扮粍鐨勯暱搴� * @param timeout 绛夊緟鏃堕棿(璞锛屽嵆鍗冨垎涔嬩竴绉�) * * @return 鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁� * */ int net_mod_socket_sendandrecv_timeout(void *_sockt, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout); + net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, + net_mod_err_t ** err_arr, int *err_arr_size, int timeout); + + +/** + * @brief 璺ㄦ満鍣ㄥ彂閫佹秷鎭苟鎺ュ彈杩斿洖鐨勫簲绛旀秷鎭紝涓嶇鏄惁鍙戦�佸畬鎴愮珛鍒昏繑鍥� + * + * 鍚憂ode_arr 涓殑鎵�鏈夌綉缁滆妭鐐瑰彂閫佽姹傛秷鎭紝鑺傜偣鐨勮繑鍥炰俊鎭眹鎬诲苟瀛樺偍鍦╮ecv_arr涓� + * @param node_arr 缃戠粶鑺傜偣缁�, + * @param node_arr_len璇ユ暟缁勯暱搴�.濡傛灉IP涓虹┖鍒欎负鏈湴鍙戦�併�� + * @param send_buf 鍙戦�佺殑娑堟伅 + * @param send_size 璇ユ秷鎭綋鐨勯暱搴� + * @param recv_arr 杩斿洖鐨勫簲绛旀秷鎭暟缁勶紝浣跨敤瀹屽悗闇�瑕佽皟鐢╪et_mod_socket_free_recv_msg_arr閲婃斁鎺� + * @param recv_arr_size 杩斿洖鐨勫簲绛旀秷鎭暟缁勯暱搴� + * @param err_arr 杩斿洖鍙戦�侀敊璇殑鑺傜偣鏁扮粍锛屼娇鐢ㄥ畬鍚庨渶瑕佽皟鐢╢ree鏂规硶閲婃斁鎺� + 脳 @param err_arr_size 杩斿洖鍙戦�侀敊璇殑鑺傜偣鏁扮粍鐨勯暱搴� + * + * @return 鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁� + * + */ +int net_mod_socket_sendandrecv_nowait(void *_sockt, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, + net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, + net_mod_err_t ** err_arr, int *err_arr_size) ; @@ -213,24 +240,6 @@ * @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜 */ int net_mod_socket_recvandsend_nowait(void *_socket, recvandsend_callback_wrapper_fn callback, void * user_data) ; - - -/** - * @brief 璺ㄦ満鍣ㄥ彂閫佹秷鎭苟鎺ュ彈杩斿洖鐨勫簲绛旀秷鎭紝涓嶇鏄惁鍙戦�佸畬鎴愮珛鍒昏繑鍥� - * - * 鍚憂ode_arr 涓殑鎵�鏈夌綉缁滆妭鐐瑰彂閫佽姹傛秷鎭紝鑺傜偣鐨勮繑鍥炰俊鎭眹鎬诲苟瀛樺偍鍦╮ecv_arr涓� - * @param node_arr 缃戠粶鑺傜偣缁�, - * @param node_arr_len璇ユ暟缁勯暱搴�.濡傛灉IP涓虹┖鍒欎负鏈湴鍙戦�併�� - * @param send_buf 鍙戦�佺殑娑堟伅 - * @param send_size 璇ユ秷鎭綋鐨勯暱搴� - * @param recv_arr 杩斿洖鐨勫簲绛旀秷鎭暟缁� - * @param recv_arr_size 杩斿洖鐨勫簲绛旀秷鎭暟缁勯暱搴� - * - * @return 鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁� - * - */ -int net_mod_socket_sendandrecv_nowait(void *_sockt, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ; diff --git a/test_net_socket/shm_util.cpp b/test_net_socket/shm_util.cpp index 4eb03f8..1b3dca3 100644 --- a/test_net_socket/shm_util.cpp +++ b/test_net_socket/shm_util.cpp @@ -200,6 +200,8 @@ int recv_arr_size, i, n; net_mod_recv_msg_t *recv_arr; + net_mod_err_t *errarr; + int errarr_size = 0; pthread_t tid; // 鍒涘缓涓�涓嚎绋嬫帴鍙楄闃呮秷鎭� @@ -234,19 +236,31 @@ if (fgets(content, MAXLINE, stdin) != NULL) { // 鏀跺埌娑堟伅鐨勮妭鐐瑰嵆浣挎病鏈夊搴旂殑淇℃伅锛� 涔熻鍥炲涓�涓〃绀烘棤鐨勬秷鎭�,鍚﹀垯浼氫竴鐩寸瓑寰� // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size); - n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1); + n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), + &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000); printf(" %d nodes reply\n", n); - for(i=0; i<recv_arr_size; i++) { - printf("reply from (host:%s, port: %d, key:%d) >> %s\n", - recv_arr[i].host, - recv_arr[i].port, - recv_arr[i].key, - (char *)recv_arr[i].content - ); - } + + if(recv_arr_size > 0) { + for(i=0; i<recv_arr_size; i++) { + printf("reply from (host:%s, port: %d, key:%d) >> %s\n", + recv_arr[i].host, + recv_arr[i].port, + recv_arr[i].key, + (char *)recv_arr[i].content + ); + } + + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + } - // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 - net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + + if(errarr_size > 0) { + for(i = 0; i < errarr_size; i++) { + printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code)); + } + free(errarr); + } } } else if(strcmp(action, "desub") == 0) { @@ -290,10 +304,12 @@ Targ *targ = (Targ *)arg; char sendbuf[128]; - int j, n; - int recv_arr_size; + int i, j, n; + int recv_arr_size = 0; net_mod_recv_msg_t *recv_arr; int total = 0; + net_mod_err_t *errarr; + int errarr_size = 0; int rkey, lkey; unsigned int l = 0 , rl; @@ -312,30 +328,42 @@ sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l); // fprintf(fp, "requst:%s\n", sendbuf); // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); - n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1); + n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, + &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1); printf("%d: send %d nodes\n", l, n); - for(j=0; j < recv_arr_size; j++) { - fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n", - net_mod_socket_get_key(client), - sendbuf, - targ->node->key, - recv_arr[j].host, - recv_arr[j].port, - recv_arr[j].key, - (char *)recv_arr[j].content - ); + if(recv_arr_size > 0) { + for(j=0; j < recv_arr_size; j++) { + fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n", + net_mod_socket_get_key(client), + sendbuf, + targ->node->key, + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key, + (char *)recv_arr[j].content + ); - printf("key == %d\n", net_mod_socket_get_key(client)); - assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3); - assert(targ->node->key == rkey); - assert(net_mod_socket_get_key(client) == lkey); - assert(rl == l); + printf("key == %d\n", net_mod_socket_get_key(client)); + assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3); + assert(targ->node->key == rkey); + assert(net_mod_socket_get_key(client) == lkey); + assert(rl == l); + } + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); } - // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 - net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + + if(errarr_size > 0) { + for(i = 0; i < errarr_size; i++) { + printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code)); + } + free(errarr); + } + total += n; } + if(fp != NULL) fclose(fp); // net_mod_socket_close(client); @@ -384,7 +412,7 @@ double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); long diffsec = (long) (difftime/1000000); long diffusec = difftime - diffsec*1000000; - fprintf(stderr,"鍙戦�佹暟鐩�:%ld, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", + fprintf(stderr,"鍙戦�佹暟鐩�:%d, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", SCALE*node_arr_size, total, diffsec, diffusec, difftime/total ); // fflush(stdout); @@ -393,11 +421,14 @@ // 鏃犻檺寰幆send void test_net_sendandrecv(char *nodelist) { - int n, j; + int i, n, j; void * client; int recv_arr_size; net_mod_recv_msg_t *recv_arr; net_node_t *node_arr; + net_mod_err_t *errarr; + int errarr_size = 0; + int node_arr_size = parse_node_list(nodelist, &node_arr); char buf[128]; pid_t pid, retPid ; @@ -413,30 +444,35 @@ while(true) { sprintf(buf, hello_format, pid, l); n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1, - &recv_arr, &recv_arr_size, 1000); + &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000); printf(" %d nodes reply\n", n); - for(j = 0; j < recv_arr_size; j++) { - printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n", - (long)pid, - buf, - (char *)recv_arr[j].content, - recv_arr[j].host, - recv_arr[j].port, - recv_arr[j].key + if(recv_arr_size > 0) { + for(j = 0; j < recv_arr_size; j++) { + printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n", + (long)pid, + buf, + (char *)recv_arr[j].content, + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key + ); + assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3); + assert(retPid == pid); + assert(retl == l); + assert(remoteKey == recv_arr[j].key); + } + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + } - ); - - - - assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3); - assert(retPid == pid); - assert(retl == l); - assert(remoteKey == recv_arr[j].key); + if(errarr_size > 0) { + for(i = 0; i < errarr_size; i++) { + printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code)); + } + free(errarr); } - // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 - net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); l++; } @@ -523,7 +559,7 @@ net_node_t *node_arr; int node_arr_size = parse_node_list(nodelist, &node_arr); - char *topic = "news"; + const char *topic = "news"; sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid()); void * client = net_mod_socket_open(); @@ -577,31 +613,45 @@ } } -void do_sendandrecv(int key, char *sendbuf) { - int n, j; +void do_sendandrecv(char *sendlist, char *sendbuf) { + int i, n, j; int recv_arr_size; net_mod_recv_msg_t *recv_arr; + net_mod_err_t *errarr; + int errarr_size = 0; - net_node_t node_arr[] = {NULL, 0, key}; + + net_node_t *node_arr; + int node_arr_size = parse_node_list(sendlist, &node_arr); + + print_node_list(node_arr, node_arr_size); void * client = net_mod_socket_open(); - n = net_mod_socket_sendandrecv_timeout(client, node_arr, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 5000); - if(n == 0) { - printf("send failed\n"); - return; - } - printf(" %d nodes reply\n", n); - for(j=0; j < recv_arr_size; j++) { + n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, + &recv_arr, &recv_arr_size, &errarr, &errarr_size, 5000); + + printf(" %d nodes reply\n", recv_arr_size); + if(recv_arr_size > 0) { + for(j=0; j < recv_arr_size; j++) { - fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n\n", - net_mod_socket_get_key(client), - sendbuf, - key, - recv_arr[j].host, - recv_arr[j].port, - recv_arr[j].key, - (char *)recv_arr[j].content - ); + fprintf(stdout, "===> suc: %d send '%s'. received from (host=%s, port= %d, key=%d), '%s'\n\n", + net_mod_socket_get_key(client), + sendbuf, + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key, + (char *)recv_arr[j].content + ); + } + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + } +// printf("errarr_size = %d\n", errarr_size); + if(errarr_size > 0) { + for(i = 0; i < errarr_size; i++) { + printf("===> error: (host:%s, port: %d, key:%d). %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code)); + } + free(errarr); } net_mod_socket_close(client); @@ -771,6 +821,10 @@ net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t)); for(i = 0; i < entry_arr_len; i++) { + if(strchr(entry_arr[i], ':') == NULL) { + node_arr[i]= {NULL, 0, atoi(entry_arr[i])}; + continue; + } property_arr_len = str_split(entry_arr[i], ":", &property_arr); printf("=====%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]); @@ -854,9 +908,9 @@ usage(prog); exit(1); } - int key = atoi(argv[1]); + char *sendlist = argv[1]; char *content = argv[2]; - do_sendandrecv(key, content); + do_sendandrecv(sendlist, content); } else if (strcmp("start_bus_server", fun) == 0) { @@ -942,7 +996,7 @@ } else { - printf("%Invalid funciton name\n"); + printf("Invalid funciton name\n"); usage(argv[0]); exit(1); -- Gitblit v1.8.0