From 82841486c36288d73e95f3316e91dd7a522d8602 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 08 二月 2021 09:39:57 +0800 Subject: [PATCH] update --- src/socket/shm_socket.cpp | 123 ++++++++++++++++++++++++++++++++-------- 1 files changed, 97 insertions(+), 26 deletions(-) diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index bd6473c..f55a111 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -429,12 +429,14 @@ // use thread local static int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf, - const int send_size, const int send_key, void **recv_buf, + const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout, int flags) { - int recv_key; - int rv; - int tryn = 0; - + + + int rv, tryn = 3; + shm_packet_t sendpak; + shm_packet_t recvpak; + std::map<int, shm_packet_t>::iterator recvbufIter; // 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭� shm_socket_t *tmp_socket; @@ -459,33 +461,102 @@ exit(1); } } + // int rv; + // int tryn = 0; + // int recv_key; + // rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags); - if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { + // if (() == 0) { - while(tryn < 3) { - tryn++; - rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); - if(rv != 0) { - logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); - return rv; - } + // while(tryn < 3) { + // tryn++; + // rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); + // if(rv != 0) { + // logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); + // return rv; + // } - // 瓒呮椂瀵艰嚧鎺ュ彂閫佸璞★紝涓庤繑鍥炲璞′笉瀵瑰簲鐨勬儏鍐� - if(send_key != recv_key) { - logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); - // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); - // exit(1); - continue; - // return EBUS_RECVFROM_WRONG_END; + // // 瓒呮椂瀵艰嚧鎺ュ彂閫佸璞★紝涓庤繑鍥炲璞′笉瀵瑰簲鐨勬儏鍐� + // if(send_key != recv_key) { + // logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); + // // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); + // // exit(1); + // continue; + // // return EBUS_RECVFROM_WRONG_END; + // } + + // return 0; + // } + + // return EBUS_RECVFROM_WRONG_END; + // } + + + + sendpak.key = tmp_socket->key; + sendpak.size = send_size; + if(send_buf != NULL) { + sendpak.buf = mm_malloc(send_size); + memcpy(sendpak.buf, send_buf, send_size); + } + rv = shm_sendpakto(tmp_socket, &sendpak, key, timeout, flags); + + if(rv != 0) { + return rv; + } + + if(rv != 0) { + return rv; + } + + while(tryn > 0) { + tryn--; + recvbufIter = tmp_socket->recvbuf2.find(key); + if(recvbufIter != tmp_socket->recvbuf2.end()) { + // 鍦ㄧ紦瀛橀噷鏌ュ埌浜哢UID鍖归厤鎴愬姛鐨� +// logger->debug("get from recvbuf: %s", uuid.c_str()); + recvpak = recvbufIter->second; + sockt->recvbuf2.erase(recvbufIter); + goto LABLE_SUC; + } + + rv = shm_recvpakfrom(tmp_socket, &recvpak, timeout, flags); + + if (rv != 0) { + + if(rv == ETIMEDOUT) { + return EBUS_TIMEOUT; } - return 0; - } - - return EBUS_RECVFROM_WRONG_END; - } + logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv)); + return rv; + } - return rv; + if (key == recvpak.key) { + // 鍙戦�佷笌鎺ュ彈鐨刄UID鍖归厤鎴愬姛 + goto LABLE_SUC; + } else { + // 绛旈潪鎵�闂紝鏀惧埌缂撳瓨閲� + tmp_socket->recvbuf2.insert({recvpak.key, recvpak}); + continue; + } + } + +LABLE_FAIL: + return EBUS_RECVFROM_WRONG_END; + +LABLE_SUC: + if(recv_buf != NULL) { + void *_buf = malloc(recvpak.size); + memcpy(_buf, recvpak.buf, recvpak.size); + *recv_buf = _buf; + } + + if(recv_size != NULL) + *recv_size = recvpak.size; + + mm_free(recvpak.buf); + return 0; } static int _shm_sendandrecv_alloc_new(shm_socket_t *sockt, const void *send_buf, -- Gitblit v1.8.0