From 2561a007b8d8999a4750046d0cfb3b1ad5af50ac Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 09 四月 2024 15:29:32 +0800 Subject: [PATCH] test for perf --- src/net/net_mod_socket.cpp | 267 +++++++++++++++++++++++++++++++++------------------- 1 files changed, 169 insertions(+), 98 deletions(-) diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp index f939c67..d10fbdc 100644 --- a/src/net/net_mod_socket.cpp +++ b/src/net/net_mod_socket.cpp @@ -15,39 +15,20 @@ NetModSocket::NetModSocket() { - int s; if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR) logger->error(errno, "NetModSocket::NetModSocket signal"); - - gpool = new NetConnPool(); - - pthread_mutexattr_t mtxAttr; - s = pthread_mutexattr_init(&mtxAttr); - if (s != 0) - err_exit(s, "pthread_mutexattr_init"); - s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK); - if (s != 0) - err_exit(s, "pthread_mutexattr_settype"); - s = pthread_mutex_init(&sendMutex, &mtxAttr); - if (s != 0) - err_exit(s, "pthread_mutex_init"); - - s = pthread_mutexattr_destroy(&mtxAttr); - if (s != 0) - err_exit(s, "pthread_mutexattr_destroy"); } NetModSocket::~NetModSocket() { - int s; - delete gpool; - s = pthread_mutex_destroy(&sendMutex); - if(s != 0) { - err_exit(s, "shm_close_socket"); - } + } + +int NetModSocket::stop() { + return shmModSocket.stop(); +} /** * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� @@ -65,19 +46,40 @@ return shmModSocket.force_bind(key); } -int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { - return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1); +int NetModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) { + + return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag); } -int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec) { - return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec); -} -int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { - return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0); +void NetModSocket::int_set(int data) { + int_val = data; } + +void NetModSocket::svr_set(int data) { + svr_val = data; +} + +int NetModSocket::int_get(void) { + return int_val; +} + +int NetModSocket::svr_get(void) { + return svr_val; +} + +// int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, +// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { +// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1); +// } +// int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, +// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec) { +// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec); +// } +// int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, +// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { +// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0); + +// } /* Free thread-specific data buffer */ @@ -86,7 +88,7 @@ NetConnPool *mpool = (NetConnPool *)_pool; delete mpool; - logger->debug("destory connPool"); + } /* One-time key creation function */ @@ -119,7 +121,6 @@ if (mpool == NULL) { /* If first call from this thread, allocate buffer for thread, and save its location */ - logger->debug("Create connPool"); mpool = new NetConnPool(); if (mpool == NULL) { LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ malloc"); @@ -143,20 +144,22 @@ -int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, - net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec ) { +int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, + net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, + net_mod_err_t ** _err_arr, int *_err_arr_size, int msec ) { - int i, n, recv_size, connfd; + int i, recv_size, connfd; net_node_t *node; void *recv_buf = NULL; struct timespec timeout; int ret; - int n_req = 0, n_recv_suc = 0, n_resp =0; + int n_req = 0, n_recv_suc = 0, n_err = 0, n_resp =0; net_mod_request_head_t request_head = {}; net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t)); + net_mod_err_t *err_arr = (net_mod_err_t *)calloc(arrlen, sizeof(net_mod_err_t)); NetConnPool *mpool = _get_pool(); @@ -175,6 +178,7 @@ } else { ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); } + if( ret == 0) { strcpy( ret_arr[n_recv_suc].host, ""); ret_arr[n_recv_suc].port = 0; @@ -183,13 +187,11 @@ ret_arr[n_recv_suc].content_length = recv_size; n_recv_suc++; } else { - if(ret > EBUS_BASE) { - // bus_errno = EBUS_TIMEOUT; - logger->debug("NetModSocket:: %d _sendandrecv_ to key %d failed, %s", get_key(), node->key, bus_strerror(ret)); - - } else { - logger->error(ret, "NetModSocket:: %d _sendandrecv_ to key %d failed", get_key(), node->key); - } + err_arr[n_err].port = 0; + err_arr[n_err].key = node->key; + err_arr[n_err].code = ret; + n_err++; + // logger->error("NetModSocket:: _sendandrecv_ to key %d failed. %s", node->key, bus_strerror(ret)); } @@ -197,6 +199,11 @@ } if( (connfd = mpool->getConn(node->host, node->port)) < 0 ) { + memcpy(err_arr[n_err].host, node->host, sizeof(err_arr[n_err].host)); + err_arr[n_err].port = node->port; + err_arr[n_err].key = node->key; + err_arr[n_err].code = EBUS_NET; + n_err++; continue; } @@ -211,6 +218,11 @@ // printf("write_request %s:%d\n", request_head.host, request_head.port); if(write_request(connfd, request_head, send_buf, send_size, NULL, 0) != 0) { LoggerFactory::getLogger()->error("write_request failture %s:%d\n", node->host, node->port); + memcpy(err_arr[n_err].host, node->host, sizeof(err_arr[n_err].host)); + err_arr[n_err].port = node->port; + err_arr[n_err].key = node->key; + err_arr[n_err].code = EBUS_NET; + n_err++; mpool->closeConn( connfd); } else { n_req++; @@ -236,19 +248,25 @@ { mpool->nready--; // printf("POLLIN %d\n", connfd); - if( (n = read_response(connfd, ret_arr+n_recv_suc)) == 0) { + if( (ret = read_response(connfd, ret_arr+n_recv_suc, err_arr + n_err)) == 0) { n_recv_suc++; // 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅 mpool->conns[i].fd = -1; } - else if(n == EBUS_NET) { + else if(ret == EBUS_NET) { // 缃戠粶閿欒 + + logger->error("NetModSocket::_sendandrecv_ read_response key = %d , %s", get_key(), bus_strerror(ret)); mpool->closeConn( connfd); + n_err++; // mpool->conns[i].fd = -1; } else { // 浠g悊鏈嶅姟娌℃湁杞彂鎴愬姛 - mpool->conns[i].fd = -1; + + logger->error("NetModSocket::_sendandrecv_ read_response key = %d , %s", get_key(), bus_strerror(ret)); + mpool->conns[i].fd = -1; + n_err++; } n_resp++; @@ -280,19 +298,71 @@ mpool->maxi = -1; + if(recv_arr != NULL) { - *recv_arr = ret_arr; + + if(n_recv_suc > 0) { + *recv_arr = ret_arr; + + } else { + free_recv_msg_arr(ret_arr, n_recv_suc); + } + } else { free_recv_msg_arr(ret_arr, n_recv_suc); } - + if(recv_arr_size != NULL) { - *recv_arr_size = n_recv_suc; + *recv_arr_size = n_recv_suc; + } + + + if(_err_arr != NULL) { + + if(n_err > 0) { + *_err_arr = err_arr; + + + } else { + *_err_arr = NULL; + *_err_arr_size = 0; + free(err_arr); + } + + } else { + free(err_arr); + } + + if(_err_arr_size != NULL) { + *_err_arr_size = n_err; } return n_recv_suc; } +void NetModSocket::buf_data_set(std::string str, int val) { + recvbuf.insert({str, val}); +} + +int NetModSocket::buf_data_get(std::string str) { + + int i; + int val = 0; + std::map<std::string, int>::iterator recvIter; + + recvIter = recvbuf.find(str); + if(recvIter != recvbuf.end()) { + + val = recvIter->second; + + } + + return val; +} + +void NetModSocket::buf_data_del(std::string str) { + recvbuf.erase(str); +} void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) { @@ -304,22 +374,19 @@ } -int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) { +int NetModSocket::pub(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) { return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, -1); } -int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) { +int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) { return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, 0); } -int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size, int msec ) { +int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int msec ) { return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, msec); } - -// int pub(char *topic, int topic_size, void *content, int content_size, int port); - -int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, +int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int msec) { int i, connfd; net_node_t *node; @@ -333,8 +400,10 @@ int ret; NetConnPool *mpool = _get_pool(); + net_mod_err_t err_msg; + // 鏈湴鍙戦�� - if(node_arr == NULL || arrlen == 0) { + if ((node_arr == NULL) || (arrlen == 0)) { if(msec == 0) { ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG); } else if(msec > 0) { @@ -406,7 +475,7 @@ { mpool->nready--; // printf("POLLIN %d\n", connfd); - if( (ret = read_response(connfd, &recv_msg)) == 0) { + if( (ret = read_response(connfd, &recv_msg, &err_msg)) == 0) { // 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅 mpool->conns[i].fd = -1; @@ -458,20 +527,20 @@ * @key 鍙戦�佺粰璋� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int NetModSocket::sendto(const void *buf, const int size, const int key){ - return shmModSocket.sendto(buf, size, key); +int NetModSocket::sendto(const void *buf, const int size, const int key, int reset, int data_set){ + return shmModSocket.sendto(buf, size, key, 0, 0, reset, data_set); } // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 -int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){ +int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec, int reset, int data_set){ struct timespec timeout = {sec, nsec}; - return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG); + return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG, reset, data_set); } // 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� -int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){ - return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG); +int NetModSocket::sendto_nowait(const void *buf, const int size, const int key, int reset, int data_set){ + return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG, reset, data_set); } @@ -480,40 +549,29 @@ * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int NetModSocket::recvfrom(void **buf, int *size, int *key) { +int NetModSocket::recvfrom(void **buf, int *size, int *key, int reset, int data_set) { - return shmModSocket.recvfrom(buf, size, key); + return shmModSocket.recvfrom(buf, size, key, 0, 0, reset, data_set); } // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){ +int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec, int reset, int data_set){ struct timespec timeout = {sec, nsec}; - return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG); + return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG, reset, data_set); } -int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){ - return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG); +int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key, int reset, int data_set){ + return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG, reset, data_set); } +int NetModSocket::recvandsend(recvandsend_callback_fn callback, + const struct timespec *timeout , int flag, void * user_data ) { - -int NetModSocket::recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback) { - return shmModSocket.recvandsend( recvbuf, recvsize, key, callback); - + return shmModSocket.recvandsend(callback, timeout, flag, user_data); } - -int NetModSocket::recvandsend_timeout(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, - const struct timespec *timeout ) { - return shmModSocket.recvandsend( recvbuf, recvsize, key, callback, timeout, BUS_TIMEOUT_FLAG); - -} - -int NetModSocket::recvandsend_nowait(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback) { - return shmModSocket.recvandsend( recvbuf, recvsize, key, callback, NULL, BUS_NOWAIT_FLAG); - -} + /** * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 @@ -606,7 +664,7 @@ //====================================================================================== int NetModSocket::write_request(int clientfd, net_mod_request_head_t &request_head, - void *content_buf, int content_size, void *topic_buf, int topic_size) { + const void *content_buf, int content_size, const void *topic_buf, int topic_size) { int buf_size; char *buf; @@ -647,7 +705,7 @@ * @return 0 鎴愬姛, EBUS_NET 缃戠粶閿欒锛� 鍏朵粬鍊� 浠g悊鏈嶅姟娌℃湁杞彂鎴愬姛銆� * */ -int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg) { +int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg, net_mod_err_t *err_arr) { int recv_size; void *recv_buf; char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH]; @@ -655,14 +713,21 @@ net_mod_response_head_t response_head; if ( rio_readn(connfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH) { LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response rio_readnb response_head"); - - return -1; + memcpy(err_arr->host, "unkown", sizeof(err_arr->host)); + err_arr->port = 0; + err_arr->key = 0; + err_arr->code = EBUS_NET; + return EBUS_NET; } response_head = NetModSocket::decode_response_head(response_head_bs); // printf(">>>> read_response %s\n", response_head.host); if(response_head.code != 0) { // 浠g悊鏈嶅姟娌¤兘鎴愬姛鍙戦�佺粰瀵瑰簲鐨刱ey + memcpy(err_arr->host, response_head.host, sizeof(err_arr->host)); + err_arr->port = response_head.port; + err_arr->key = response_head.key; + err_arr->code = response_head.code; return response_head.code; } @@ -671,7 +736,13 @@ LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc recv_buf"); exit(1); } + if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) != response_head.content_length) { + + memcpy(err_arr->host, response_head.host, sizeof(err_arr->host)); + err_arr->port = response_head.port; + err_arr->key = response_head.key; + err_arr->code = EBUS_NET; LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response rio_readnb recv_buf"); //缃戠粶閿欒 return EBUS_NET; @@ -732,23 +803,23 @@ head.mod = ntohl(GET(tmp_ptr)); - tmp_ptr += 4; + tmp_ptr += sizeof(uint32_t); memcpy(head.host, tmp_ptr, sizeof(head.host)); tmp_ptr += sizeof(head.host); head.port = ntohl(GET(tmp_ptr)); - tmp_ptr += 4; + tmp_ptr += sizeof(uint32_t); head.key = ntohl(GET(tmp_ptr)); - tmp_ptr += 4; + tmp_ptr += sizeof(uint32_t); head.content_length = ntohl(GET(tmp_ptr)); - tmp_ptr += 4; + tmp_ptr += sizeof(uint32_t); head.topic_length = ntohl(GET(tmp_ptr)); - tmp_ptr += 4; + tmp_ptr += sizeof(uint32_t); head.timeout = ntohl(GET_INT32(tmp_ptr)); return head; -- Gitblit v1.8.0