From 76bd3ff05443dc703c0fa1f8907301199b2c09c0 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期三, 13 一月 2021 13:01:21 +0800 Subject: [PATCH] Merge branch 'master' of https://github.com/wangzhengquan/shmqueue --- src/socket/shm_socket.cpp | 112 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 files changed, 89 insertions(+), 23 deletions(-) diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index ddc26fa..e2d3fab 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -30,7 +30,7 @@ static inline int _shm_socket_check_key(shm_socket_t *socket) { void *tmp_ptr = mm_get_by_key(socket->key); if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) { - bus_errno = EBUS_KEY_INUSED; + bus_errno = ESHM_BUS_KEY_INUSED; logger->error("%s. key = %d ", bus_strerror(bus_errno), socket->key); return 0; } @@ -46,6 +46,8 @@ } shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { + int s, type; + pthread_mutexattr_t mtxAttr; logger->debug("shm_open_socket\n"); shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); @@ -54,14 +56,27 @@ socket->force_bind = false; socket->dispatch_thread = 0; socket->status = SHM_CONN_CLOSED; - socket->mutex = SemUtil::get(IPC_PRIVATE, 1); - + + s = pthread_mutexattr_init(&mtxAttr); + if (s != 0) + err_exit(s, "pthread_mutexattr_init"); + s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK); + if (s != 0) + err_exit(s, "pthread_mutexattr_settype"); + s = pthread_mutex_init(&(socket->mutex), &mtxAttr); + if (s != 0) + err_exit(s, "pthread_mutex_init"); + + s = pthread_mutexattr_destroy(&mtxAttr); + if (s != 0) + err_exit(s, "pthread_mutexattr_destroy"); + return socket; } int shm_close_socket(shm_socket_t *socket) { - int ret; + int ret, s; logger->debug("shm_close_socket\n"); switch (socket->socket_type) { @@ -74,8 +89,13 @@ default: break; } + + s = pthread_mutex_destroy(&(socket->mutex) ); + if(s != 0) { + err_exit(s, "shm_close_socket"); + } + free(socket); - SemUtil::remove(socket->mutex); return ret; } @@ -113,8 +133,8 @@ } else { if(!_shm_socket_check_key(socket)) { - bus_errno = EBUS_KEY_INUSED; - return EBUS_KEY_INUSED; + bus_errno = ESHM_BUS_KEY_INUSED; + return ESHM_BUS_KEY_INUSED; } } @@ -204,8 +224,8 @@ socket->key = hashtable_alloc_key(hashtable); } else { if(!_shm_socket_check_key(socket)) { - bus_errno = EBUS_KEY_INUSED; - return EBUS_KEY_INUSED; + bus_errno = ESHM_BUS_KEY_INUSED; + return ESHM_BUS_KEY_INUSED; } } @@ -296,6 +316,10 @@ // 鐭繛鎺ユ柟寮忓彂閫� int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec *timeout, const int flags) { + + int s; + bool rv; + if (socket->socket_type != SHM_SOCKET_DGRAM) { logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket which is " "not a SHM_SOCKET_DGRAM socket ", @@ -304,22 +328,27 @@ } hashtable_t *hashtable = mm_get_hashtable(); - SemUtil::dec(socket->mutex); + + if ((s = pthread_mutex_lock(&(socket->mutex))) != 0) + err_exit(s, "shm_sendto : pthread_mutex_lock"); + if (socket->queue == NULL) { if (socket->key == -1) { socket->key = hashtable_alloc_key(hashtable); } else { if(!_shm_socket_check_key(socket)) { - bus_errno = EBUS_KEY_INUSED; - return EBUS_KEY_INUSED; + bus_errno = ESHM_BUS_KEY_INUSED; + return ESHM_BUS_KEY_INUSED; } } socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16); } - SemUtil::inc(socket->mutex); + + if ((s = pthread_mutex_unlock(&(socket->mutex))) != 0) + err_exit(s, "shm_sendto : pthread_mutex_unlock"); // if (key == socket->key) { // logger->error( "can not send to your self!"); @@ -341,7 +370,7 @@ memcpy(dest.buf, buf, size); // printf("shm_sendto push before\n"); - bool rv; + if(flags & SHM_MSG_NOWAIT != 0) { rv = remoteQueue->push_nowait(dest); } else if(timeout != NULL) { @@ -357,9 +386,9 @@ } else { delete remoteQueue; mm_free(dest.buf); - if(errno == EAGAIN) { + if(errno == ETIMEDOUT) { bus_errno = EBUS_TIMEOUT; - logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno)); + logger->error(errno, "sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT)); return EBUS_TIMEOUT; } else { logger->error(errno, "sendto key %d failed!", key); @@ -372,6 +401,9 @@ // 鐭繛鎺ユ柟寮忔帴鍙� int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, struct timespec *timeout, int flags) { + int s; + bool rv; + if (socket->socket_type != SHM_SOCKET_DGRAM) { logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket which " "is not a SHM_SOCKET_DGRAM socket ", @@ -379,25 +411,30 @@ exit(1); } hashtable_t *hashtable = mm_get_hashtable(); - SemUtil::dec(socket->mutex); + + if ((s = pthread_mutex_lock(&(socket->mutex))) != 0) + err_exit(s, "shm_recvfrom : pthread_mutex_lock"); + if (socket->queue == NULL) { if (socket->key == -1) { socket->key = hashtable_alloc_key(hashtable); } else { if(!_shm_socket_check_key(socket)) { - bus_errno = EBUS_KEY_INUSED; - return EBUS_KEY_INUSED; + bus_errno = ESHM_BUS_KEY_INUSED; + return ESHM_BUS_KEY_INUSED; } } socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16); } - SemUtil::inc(socket->mutex); + + if ((s = pthread_mutex_unlock(&(socket->mutex))) != 0) + err_exit(s, "shm_recvfrom : pthread_mutex_unlock"); shm_msg_t src; // printf("shm_recvfrom pop before\n"); - bool rv; + if(flags & SHM_MSG_NOWAIT != 0) { rv = socket->queue->pop_nowait(src); } else if(timeout != NULL) { @@ -461,8 +498,8 @@ } - -int shm_sendandrecv_safe(shm_socket_t *socket, const void *send_buf, +// use thread local +int _shm_sendandrecv_thread_local(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout, int flags) { int recv_key; @@ -510,6 +547,35 @@ return -1; } +int _shm_sendandrecv_alloc_new(shm_socket_t *socket, const void *send_buf, + const int send_size, const int send_key, void **recv_buf, + int *recv_size, struct timespec *timeout, int flags) { + int recv_key; + int rv; + + // 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭� + shm_socket_t *tmp_socket; + + if (socket->socket_type != SHM_SOCKET_DGRAM) { + logger->error( "shm_socket.shm_sendandrecv: Can't invoke shm_sendandrecv method in a %d type socket " + "which is not a SHM_SOCKET_DGRAM socket ", + socket->socket_type); + exit(1); + } + + /* If first call from this thread, allocate buffer for thread, and save its location */ + // logger->debug("%d create tmp socket\n", pthread_self() ); + tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); + + if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { + rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); + } + + shm_close_socket(tmp_socket); + return rv; + +} + int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout, int flags) { -- Gitblit v1.8.0