From 31b6fc52f49449189e31e13626c392d4c576c833 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 05 二月 2021 18:16:58 +0800 Subject: [PATCH] update --- src/socket/shm_socket.cpp | 176 +++++++++++++++++++++++++++++++--------------------------- 1 files changed, 95 insertions(+), 81 deletions(-) diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 14274ca..bb23298 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -26,6 +26,15 @@ static int shm_sendpakto(shm_socket_t *sockt, const shm_packet_t *sendpak, const int key, const struct timespec *timeout, const int flag); + +static int _shm_sendandrecv_use_uuid(shm_socket_t *sockt, const void *send_buf, + const int send_size, const int key, void **recv_buf, + int *recv_size, const struct timespec *timeout, int flags); + +static int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf, + const int send_size, const int send_key, void **recv_buf, + int *recv_size, const struct timespec *timeout, int flags) ; + // 妫�鏌ey鏄惁宸茬粡琚娇鐢紝 鏈浣跨敤鍒欑粦瀹歬ey static LockFreeQueue<shm_packet_t> * shm_socket_bind_queue(int key, bool force) { hashtable_t *hashtable = mm_get_hashtable(); @@ -205,90 +214,12 @@ return rv; } - - int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout, int flags) { + return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, key, recv_buf, recv_size, timeout, flags); +} - int rv, tryn = 3; - shm_packet_t sendpak; - shm_packet_t recvpak; - std::map<std::string, shm_packet_t>::iterator recvbufIter; - - std::string uuid = sole::uuid4().str(); - - sendpak.key = sockt->key; - sendpak.size = send_size; - if(send_buf != NULL) { - sendpak.buf = mm_malloc(send_size); - memcpy(sendpak.buf, send_buf, send_size); - } - memcpy(sendpak.uuid, uuid.c_str(), uuid.length() + 1); - // uuid.copy(sendpak.uuid, sizeof sendpak.uuid); - rv = shm_sendpakto(sockt, &sendpak, key, timeout, flags); - - if(rv != 0) { - return rv; - } - - while(tryn > 0) { - tryn--; - recvbufIter = sockt->recvbuf.find(uuid); - if(recvbufIter != sockt->recvbuf.end()) { - // 鍦ㄧ紦瀛橀噷鏌ュ埌浜哢UID鍖归厤鎴愬姛鐨� -logger->debug("get from recvbuf: %s", uuid.c_str()); - recvpak = recvbufIter->second; - sockt->recvbuf.erase(recvbufIter); - goto LABLE_SUC; - } - - rv = shm_recvpakfrom(sockt, &recvpak, timeout, flags); - - if (rv != 0) { - - if(rv == ETIMEDOUT) { - return EBUS_TIMEOUT; - } - - logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); - return rv; - } - -logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid); - if(strlen(recvpak.uuid) == 0) { - continue; - } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) { - // 鍙戦�佷笌鎺ュ彈鐨刄UID鍖归厤鎴愬姛 - goto LABLE_SUC; - } else { - // 绛旈潪鎵�闂紝鏀惧埌缂撳瓨閲� - sockt->recvbuf.insert({recvpak.uuid, recvpak}); - continue; - } - } - -LABLE_FAIL: - return EBUS_RECVFROM_WRONG_END; - // return rv; - -LABLE_SUC: - if(recv_buf != NULL) { - void *_buf = malloc(recvpak.size); - memcpy(_buf, recvpak.buf, recvpak.size); - *recv_buf = _buf; - } - - if(recv_size != NULL) - *recv_size = recvpak.size; - - mm_free(recvpak.buf); - - return 0; - - - -} /** * @callback void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize) @@ -413,8 +344,91 @@ } +static int _shm_sendandrecv_use_uuid(shm_socket_t *sockt, const void *send_buf, + const int send_size, const int key, void **recv_buf, + int *recv_size, const struct timespec *timeout, int flags) { + + int rv, tryn = 3; + shm_packet_t sendpak; + shm_packet_t recvpak; + std::map<std::string, shm_packet_t>::iterator recvbufIter; + + std::string uuid = sole::uuid4().str(); + + sendpak.key = sockt->key; + sendpak.size = send_size; + if(send_buf != NULL) { + sendpak.buf = mm_malloc(send_size); + memcpy(sendpak.buf, send_buf, send_size); + } + memcpy(sendpak.uuid, uuid.c_str(), uuid.length() + 1); + // uuid.copy(sendpak.uuid, sizeof sendpak.uuid); + rv = shm_sendpakto(sockt, &sendpak, key, timeout, flags); + + if(rv != 0) { + return rv; + } + + while(tryn > 0) { + tryn--; + recvbufIter = sockt->recvbuf.find(uuid); + if(recvbufIter != sockt->recvbuf.end()) { + // 鍦ㄧ紦瀛橀噷鏌ュ埌浜哢UID鍖归厤鎴愬姛鐨� +logger->debug("get from recvbuf: %s", uuid.c_str()); + recvpak = recvbufIter->second; + sockt->recvbuf.erase(recvbufIter); + goto LABLE_SUC; + } + + rv = shm_recvpakfrom(sockt, &recvpak, timeout, flags); + + if (rv != 0) { + + if(rv == ETIMEDOUT) { + return EBUS_TIMEOUT; + } + + logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); + return rv; + } + +logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid); + if(strlen(recvpak.uuid) == 0) { + continue; + } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) { + // 鍙戦�佷笌鎺ュ彈鐨刄UID鍖归厤鎴愬姛 + goto LABLE_SUC; + } else { + // 绛旈潪鎵�闂紝鏀惧埌缂撳瓨閲� + sockt->recvbuf.insert({recvpak.uuid, recvpak}); + continue; + } + } + +LABLE_FAIL: + return EBUS_RECVFROM_WRONG_END; + // return rv; + +LABLE_SUC: + if(recv_buf != NULL) { + void *_buf = malloc(recvpak.size); + memcpy(_buf, recvpak.buf, recvpak.size); + *recv_buf = _buf; + } + + if(recv_size != NULL) + *recv_size = recvpak.size; + + mm_free(recvpak.buf); + + return 0; + + + +} + // use thread local -int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf, +static int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout, int flags) { int recv_key; -- Gitblit v1.8.0