From 83956b12d863924936a98c9dfbece37feb0cce9c Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 04 二月 2021 14:48:04 +0800 Subject: [PATCH] update --- src/net/net_mod_socket.cpp | 165 +++++++++++++++++------------------------------------- 1 files changed, 53 insertions(+), 112 deletions(-) diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp index 3ef15b9..8c1465d 100644 --- a/src/net/net_mod_socket.cpp +++ b/src/net/net_mod_socket.cpp @@ -17,32 +17,32 @@ { int s; if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR) - logger->error(errno, "NetModSocket::NetModSocket signal"); + logger->error(errno, "NetModSocket::NetModSocket signal"); - gpool = new NetConnPool(); + // gpool = new NetConnPool(); - pthread_mutexattr_t mtxAttr; - s = pthread_mutexattr_init(&mtxAttr); - if (s != 0) - err_exit(s, "pthread_mutexattr_init"); - s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK); - if (s != 0) - err_exit(s, "pthread_mutexattr_settype"); - s = pthread_mutex_init(&sendMutex, &mtxAttr); - if (s != 0) - err_exit(s, "pthread_mutex_init"); + // pthread_mutexattr_t mtxAttr; + // s = pthread_mutexattr_init(&mtxAttr); + // if (s != 0) + // err_exit(s, "pthread_mutexattr_init"); + // s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK); + // if (s != 0) + // err_exit(s, "pthread_mutexattr_settype"); + // s = pthread_mutex_init(&sendMutex, &mtxAttr); + // if (s != 0) + // err_exit(s, "pthread_mutex_init"); - s = pthread_mutexattr_destroy(&mtxAttr); - if (s != 0) - err_exit(s, "pthread_mutexattr_destroy"); + // s = pthread_mutexattr_destroy(&mtxAttr); + // if (s != 0) + // err_exit(s, "pthread_mutexattr_destroy"); } NetModSocket::~NetModSocket() { int s; - delete gpool; - s = pthread_mutex_destroy(&sendMutex); + // delete gpool; + // s = pthread_mutex_destroy(&sendMutex); if(s != 0) { err_exit(s, "shm_close_socket"); } @@ -141,6 +141,8 @@ } + + int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec ) { @@ -165,11 +167,11 @@ // 鏈湴鍙戦�� if(msec == 0) { - ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size); + ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG); } else if(msec > 0){ timeout.tv_sec = msec / 1000; timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; - ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout); + ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG); } else { ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); } @@ -334,17 +336,18 @@ // 鏈湴鍙戦�� if(node_arr == NULL || arrlen == 0) { if(msec == 0) { - ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY); + ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG); } else if(msec > 0) { timeout.tv_sec = msec / 1000; timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; - ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout); + ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG); } else { ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY); } if(ret == 0 ) { n_pub_suc++; } + return n_pub_suc; } for (i = 0; i < arrlen; i++) { @@ -353,11 +356,11 @@ if(node->host == NULL) { // 鏈湴鍙戦�� if(msec == 0) { - ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY); + ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG); } else if(msec > 0) { timeout.tv_sec = msec / 1000; timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; - ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout); + ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG); } else { ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY); } @@ -374,7 +377,7 @@ } request_head.mod = BUS; memcpy(request_head.host, node->host, sizeof(request_head.host)); - request_head.key = node->key; + request_head.key = SHM_BUS_KEY; request_head.content_length = content_size; request_head.topic_length = strlen(topic) + 1; request_head.timeout = msec; @@ -456,55 +459,20 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ int NetModSocket::sendto(const void *buf, const int size, const int key){ - int rv = shmModSocket.sendto(buf, size, key); - if(rv == 0) { - logger->debug("NetModSocket::sendto: %d sendto %d success.\n", get_key(), key); - return 0; - } - - if(rv > EBUS_BASE) { - // bus_errno = EBUS_TIMEOUT; - logger->debug("NetModSocket::sendto: %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); - } else { - logger->error(rv, "NetModSocket::sendto : %d sendto %d failed", get_key(), key); - } - return rv; + return shmModSocket.sendto(buf, size, key); } // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){ struct timespec timeout = {sec, nsec}; - int rv = shmModSocket.sendto_timeout(buf, size, key, &timeout); - if(rv == 0) { - logger->debug("NetModSocket::sendto_timeout: %d sendto %d success.\n", get_key(), key); - return 0; - } - - if(rv > EBUS_BASE) { - // bus_errno = EBUS_TIMEOUT; - logger->debug("NetModSocket::sendto_timeout : %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); - } else { - logger->error(rv, "NetModSocket::sendto_timeout: %d sendto %d failed", get_key(), key); - } - return rv; + return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG); + } // 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){ - int rv = shmModSocket.sendto_nowait(buf, size, key); - if(rv == 0) { - logger->debug("NetModSocket::sendto_nowait: %d sendto %d success.\n", get_key(), key); - return 0; - } - - if(rv > EBUS_BASE) { - // bus_errno = EBUS_TIMEOUT; - logger->debug("NetModSocket::sendto_nowait %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); - - } else { - logger->error(rv, "NetModSocket::sendto_nowait %d sendto %d failed", get_key(), key); - } - return rv; + return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG); + } /** @@ -514,55 +482,28 @@ */ int NetModSocket::recvfrom(void **buf, int *size, int *key) { - logger->debug(" %d NetModSocket::recvfrom before", get_key()); - int rv = shmModSocket.recvfrom(buf, size, key); - - if(rv == 0) { - logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key); - return 0; - } - - if(rv > EBUS_BASE) { - logger->debug("NetModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv)); - } else { - logger->error(rv, "NetModSocket::recvfrom: socket %d recvfrom failed", get_key()); - } - return rv; + return shmModSocket.recvfrom(buf, size, key); + } // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){ struct timespec timeout = {sec, nsec}; - int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout); - if(rv == 0) { - logger->debug("NetModSocket::recvfrom_timeout: %d recvfrom %d success.\n", get_key(), *key); - return 0; - } - - if(rv > EBUS_BASE) { - // bus_errno = EBUS_TIMEOUT; - logger->debug("NetModSocket::recvfrom_timeout: %d recvfrom failed %s", get_key(), bus_strerror(rv)); - } else { - logger->error(rv, "NetModSocket::recvfrom_timeout: %d recvfrom failed", get_key()); - } - return rv; + return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG); + } int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){ - int rv = shmModSocket.recvfrom_nowait(buf, size, key); - if(rv == 0) { - logger->debug("NetModSocket::recvfrom_nowait: %d recvfrom %d success.\n", get_key(), *key); - return 0; - } - - if(rv > EBUS_BASE) { - // bus_errno = EBUS_TIMEOUT; - logger->debug("NetModSocket::recvfrom_nowait: %d recvfrom failed %s", get_key(), bus_strerror(rv)); - } else { - logger->error(rv, "NetModSocket::recvfrom_nowait: %d recvfrom failed", get_key()); - } - return rv; + return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG); } + + +int NetModSocket::recvandsend(recvandsend_callback_fn callback, + const struct timespec *timeout , int flag, void * user_data ) { + + return shmModSocket.recvandsend(callback, timeout, flag, user_data); +} + /** * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 @@ -575,10 +516,10 @@ // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 int NetModSocket::sendandrecv_timeout( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, int sec, int nsec){ struct timespec timeout = {sec, nsec}; - return shmModSocket.sendandrecv_timeout(send_buf, send_size, key, recv_buf, recv_size, &timeout); + return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, &timeout, BUS_TIMEOUT_FLAG); } int NetModSocket::sendandrecv_nowait( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) { - return shmModSocket.sendandrecv_nowait(send_buf, send_size, key, recv_buf, recv_size); + return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, NULL, BUS_NOWAIT_FLAG); } @@ -594,10 +535,10 @@ // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 int NetModSocket::sub_timeout( void *topic, int size, int key, int sec, int nsec){ struct timespec timeout = {sec, nsec}; - return shmModSocket.sub_timeout((char *)topic, size, key, &timeout); + return shmModSocket.sub((char *)topic, size, key, &timeout, BUS_TIMEOUT_FLAG); } int NetModSocket::sub_nowait( void *topic, int size, int key){ - return shmModSocket.sub_nowait((char *)topic, size, key); + return shmModSocket.sub((char *)topic, size, key, NULL, BUS_NOWAIT_FLAG); } @@ -614,10 +555,10 @@ // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 int NetModSocket::desub_timeout( void *topic, int size, int key, int sec, int nsec){ struct timespec timeout = {sec, nsec}; - return shmModSocket.desub_timeout((char *)topic, size, key, &timeout); + return shmModSocket.desub((char *)topic, size, key, &timeout, BUS_TIMEOUT_FLAG); } int NetModSocket::desub_nowait( void *topic, int size, int key){ - return shmModSocket.desub_nowait((char *)topic, size, key); + return shmModSocket.desub((char *)topic, size, key, NULL, BUS_NOWAIT_FLAG); } @@ -634,10 +575,10 @@ // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 int NetModSocket::pub_timeout( char *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec){ struct timespec timeout = {sec, nsec}; - return shmModSocket.pub_timeout(topic, topic_size, content, content_size, key, &timeout); + return shmModSocket.pub(topic, topic_size, content, content_size, key, &timeout, BUS_TIMEOUT_FLAG); } int NetModSocket::pub_nowait( char *topic, int topic_size, void *content, int content_size, int key){ - return shmModSocket.pub_nowait(topic, topic_size, content, content_size, key); + return shmModSocket.pub(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG); } -- Gitblit v1.8.0