From 45e00aca28504b27f3ad6b4abf364c3d57f34510 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 22 二月 2021 14:05:28 +0800 Subject: [PATCH] lock free queue --- src/net/net_mod_socket.cpp | 64 +++++++++++++++++++------------ 1 files changed, 39 insertions(+), 25 deletions(-) diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp index 3668b14..e689cea 100644 --- a/src/net/net_mod_socket.cpp +++ b/src/net/net_mod_socket.cpp @@ -19,35 +19,39 @@ if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR) logger->error(errno, "NetModSocket::NetModSocket signal"); - gpool = new NetConnPool(); + // gpool = new NetConnPool(); - pthread_mutexattr_t mtxAttr; - s = pthread_mutexattr_init(&mtxAttr); - if (s != 0) - err_exit(s, "pthread_mutexattr_init"); - s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK); - if (s != 0) - err_exit(s, "pthread_mutexattr_settype"); - s = pthread_mutex_init(&sendMutex, &mtxAttr); - if (s != 0) - err_exit(s, "pthread_mutex_init"); + // pthread_mutexattr_t mtxAttr; + // s = pthread_mutexattr_init(&mtxAttr); + // if (s != 0) + // err_exit(s, "pthread_mutexattr_init"); + // s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK); + // if (s != 0) + // err_exit(s, "pthread_mutexattr_settype"); + // s = pthread_mutex_init(&sendMutex, &mtxAttr); + // if (s != 0) + // err_exit(s, "pthread_mutex_init"); - s = pthread_mutexattr_destroy(&mtxAttr); - if (s != 0) - err_exit(s, "pthread_mutexattr_destroy"); + // s = pthread_mutexattr_destroy(&mtxAttr); + // if (s != 0) + // err_exit(s, "pthread_mutexattr_destroy"); } NetModSocket::~NetModSocket() { - int s; - delete gpool; - s = pthread_mutex_destroy(&sendMutex); - if(s != 0) { - err_exit(s, "shm_close_socket"); - } + // int s; + // delete gpool; + // s = pthread_mutex_destroy(&sendMutex); + // if(s != 0) { + // err_exit(s, "shm_socket_close"); + // } } + +int NetModSocket::stop() { + return shmModSocket.stop(); +} /** * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� @@ -139,6 +143,8 @@ NetConnPool* NetModSocket::_get_pool() { return _get_threadlocal_pool(); } + + int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, @@ -302,22 +308,22 @@ } -int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) { +int NetModSocket::pub(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) { return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, -1); } -int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) { +int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) { return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, 0); } -int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size, int msec ) { +int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int msec ) { return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, msec); } // int pub(char *topic, int topic_size, void *content, int content_size, int port); -int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, +int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int msec) { int i, connfd; net_node_t *node; @@ -495,6 +501,14 @@ return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG); } + +int NetModSocket::recvandsend(recvandsend_callback_fn callback, + const struct timespec *timeout , int flag, void * user_data ) { + + return shmModSocket.recvandsend(callback, timeout, flag, user_data); +} + + /** * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 * @key 鍙戦�佺粰璋� @@ -586,7 +600,7 @@ //====================================================================================== int NetModSocket::write_request(int clientfd, net_mod_request_head_t &request_head, - void *content_buf, int content_size, void *topic_buf, int topic_size) { + const void *content_buf, int content_size, const void *topic_buf, int topic_size) { int buf_size; char *buf; -- Gitblit v1.8.0