From 044e10574fa4e007be408d991861d34ecf22622a Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 22 二月 2021 19:17:37 +0800 Subject: [PATCH] update --- src/socket/shm_socket.cpp | 46 +++++++++++++++++++++++----------------------- 1 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 7e33cc6..d8304fd 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -25,7 +25,7 @@ static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout, int flag); -static int shm_sendpakto(shm_socket_t *sockt, const shm_packet_t *sendpak, +static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, const int key, const struct timespec *timeout, const int flag); @@ -114,14 +114,11 @@ int rv; logger->debug("shm_socket_close\n"); + // hashtable_remove(hashtable, mkey); // if(sockt->queue != NULL) { - // delete sockt->queue; // sockt->queue = NULL; // } - // hashtable_remove(hashtable, mkey); - - if(sockt->key != 0) { auto it = shmQueueStMap->find(sockt->key); if(it != shmQueueStMap->end()) { @@ -257,12 +254,8 @@ if (rv != 0) { - if(rv == ETIMEDOUT) - return EBUS_TIMEOUT; - else { - logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); - return rv; - } + logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); + return rv; } @@ -279,6 +272,9 @@ if(key != NULL) *key = recvpak.key; + if(recvpak.key == 0) { + err_exit(0, "key = %d, pid= %d, recvpak.key == 0", shm_socket_get_key(sockt), getpid()); + } mm_free(recvpak.buf); return 0; } @@ -402,15 +398,13 @@ int *recv_size, const struct timespec *timeout, int flags) { - int rv, tryn = 6; + int rv = 0, tryn = 16; shm_packet_t sendpak; shm_packet_t recvpak; std::map<int, shm_packet_t>::iterator recvbufIter; // 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭� shm_socket_t *tmp_socket; - /* If first call from this thread, allocate buffer for thread, and save its location */ - // logger->debug("%d create tmp socket\n", pthread_self() ); rv = pthread_once(&_once_, _create_socket_key_perthread); if (rv != 0) { logger->error(rv, "shm_sendandrecv pthread_once"); @@ -431,7 +425,6 @@ } } - sendpak.key = tmp_socket->key; sendpak.size = send_size; if(send_buf != NULL) { @@ -458,11 +451,6 @@ rv = shm_recvpakfrom(tmp_socket, &recvpak, timeout, flags); if (rv != 0) { - - if(rv == ETIMEDOUT) { - return EBUS_TIMEOUT; - } - logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv)); return rv; } @@ -538,7 +526,7 @@ -static int shm_sendpakto(shm_socket_t *sockt, const shm_packet_t *sendpak, +static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, const int key, const struct timespec *timeout, const int flag) { int rv; @@ -564,6 +552,8 @@ sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); if(sockt->queue == NULL ) { logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); + if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) + err_exit(rv, "shm_sendto : pthread_mutex_unlock"); return EBUS_KEY_INUSED; } @@ -601,7 +591,11 @@ goto ERR_CLOSED; } + sendpak->key = sockt->key; rv = remoteQueue->push(*sendpak, timeout, flag); + if(rv == ETIMEDOUT) { + return EBUS_TIMEOUT; + } return rv; ERR_CLOSED: @@ -635,6 +629,8 @@ sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); if(sockt->queue == NULL ) { logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); + if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) + err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); return EBUS_KEY_INUSED; } @@ -661,14 +657,18 @@ // } rv = sockt->queue->pop(recvpak, timeout, flag); - if(rv != 0) + if(rv != 0) { + if(rv == ETIMEDOUT) { + return EBUS_TIMEOUT; + } return rv; + } if(recvpak.action == BUS_ACTION_STOP) { return EBUS_STOPED; } *_recvpak = recvpak; - return rv; + return 0; } -- Gitblit v1.8.0