From 6c32e1a482c14412108675ec78f49ebe4f94a374 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 07 一月 2021 12:04:03 +0800 Subject: [PATCH] update --- src/socket/shm_socket.cpp | 75 ++++++++++++++++++++++++++++--------- 1 files changed, 56 insertions(+), 19 deletions(-) diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index ddc26fa..857e7e6 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -30,7 +30,7 @@ static inline int _shm_socket_check_key(shm_socket_t *socket) { void *tmp_ptr = mm_get_by_key(socket->key); if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) { - bus_errno = EBUS_KEY_INUSED; + bus_errno = ESHM_BUS_KEY_INUSED; logger->error("%s. key = %d ", bus_strerror(bus_errno), socket->key); return 0; } @@ -46,6 +46,8 @@ } shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { + int s, type; + pthread_mutexattr_t mtxAttr; logger->debug("shm_open_socket\n"); shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); @@ -54,14 +56,27 @@ socket->force_bind = false; socket->dispatch_thread = 0; socket->status = SHM_CONN_CLOSED; - socket->mutex = SemUtil::get(IPC_PRIVATE, 1); - + + s = pthread_mutexattr_init(&mtxAttr); + if (s != 0) + err_exit(s, "pthread_mutexattr_init"); + s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK); + if (s != 0) + err_exit(s, "pthread_mutexattr_settype"); + s = pthread_mutex_init(&(socket->mutex), &mtxAttr); + if (s != 0) + err_exit(s, "pthread_mutex_init"); + + s = pthread_mutexattr_destroy(&mtxAttr); + if (s != 0) + err_exit(s, "pthread_mutexattr_destroy"); + return socket; } int shm_close_socket(shm_socket_t *socket) { - int ret; + int ret, s; logger->debug("shm_close_socket\n"); switch (socket->socket_type) { @@ -74,8 +89,13 @@ default: break; } + + s = pthread_mutex_destroy(&(socket->mutex) ); + if(s != 0) { + err_exit(s, "shm_close_socket"); + } + free(socket); - SemUtil::remove(socket->mutex); return ret; } @@ -113,8 +133,8 @@ } else { if(!_shm_socket_check_key(socket)) { - bus_errno = EBUS_KEY_INUSED; - return EBUS_KEY_INUSED; + bus_errno = ESHM_BUS_KEY_INUSED; + return ESHM_BUS_KEY_INUSED; } } @@ -204,8 +224,8 @@ socket->key = hashtable_alloc_key(hashtable); } else { if(!_shm_socket_check_key(socket)) { - bus_errno = EBUS_KEY_INUSED; - return EBUS_KEY_INUSED; + bus_errno = ESHM_BUS_KEY_INUSED; + return ESHM_BUS_KEY_INUSED; } } @@ -296,6 +316,10 @@ // 鐭繛鎺ユ柟寮忓彂閫� int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec *timeout, const int flags) { + + int s; + bool rv; + if (socket->socket_type != SHM_SOCKET_DGRAM) { logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket which is " "not a SHM_SOCKET_DGRAM socket ", @@ -304,22 +328,27 @@ } hashtable_t *hashtable = mm_get_hashtable(); - SemUtil::dec(socket->mutex); + + if ((s = pthread_mutex_lock(&(socket->mutex))) != 0) + err_exit(s, "shm_sendto : pthread_mutex_lock"); + if (socket->queue == NULL) { if (socket->key == -1) { socket->key = hashtable_alloc_key(hashtable); } else { if(!_shm_socket_check_key(socket)) { - bus_errno = EBUS_KEY_INUSED; - return EBUS_KEY_INUSED; + bus_errno = ESHM_BUS_KEY_INUSED; + return ESHM_BUS_KEY_INUSED; } } socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16); } - SemUtil::inc(socket->mutex); + + if ((s = pthread_mutex_unlock(&(socket->mutex))) != 0) + err_exit(s, "shm_sendto : pthread_mutex_unlock"); // if (key == socket->key) { // logger->error( "can not send to your self!"); @@ -341,7 +370,7 @@ memcpy(dest.buf, buf, size); // printf("shm_sendto push before\n"); - bool rv; + if(flags & SHM_MSG_NOWAIT != 0) { rv = remoteQueue->push_nowait(dest); } else if(timeout != NULL) { @@ -372,6 +401,9 @@ // 鐭繛鎺ユ柟寮忔帴鍙� int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, struct timespec *timeout, int flags) { + int s; + bool rv; + if (socket->socket_type != SHM_SOCKET_DGRAM) { logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket which " "is not a SHM_SOCKET_DGRAM socket ", @@ -379,25 +411,30 @@ exit(1); } hashtable_t *hashtable = mm_get_hashtable(); - SemUtil::dec(socket->mutex); + + if ((s = pthread_mutex_lock(&(socket->mutex))) != 0) + err_exit(s, "shm_recvfrom : pthread_mutex_lock"); + if (socket->queue == NULL) { if (socket->key == -1) { socket->key = hashtable_alloc_key(hashtable); } else { if(!_shm_socket_check_key(socket)) { - bus_errno = EBUS_KEY_INUSED; - return EBUS_KEY_INUSED; + bus_errno = ESHM_BUS_KEY_INUSED; + return ESHM_BUS_KEY_INUSED; } } socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16); } - SemUtil::inc(socket->mutex); + + if ((s = pthread_mutex_unlock(&(socket->mutex))) != 0) + err_exit(s, "shm_recvfrom : pthread_mutex_unlock"); shm_msg_t src; // printf("shm_recvfrom pop before\n"); - bool rv; + if(flags & SHM_MSG_NOWAIT != 0) { rv = socket->queue->pop_nowait(src); } else if(timeout != NULL) { -- Gitblit v1.8.0