From d1f7194a61f349bbd62ab1956001b1a905815ecb Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 23 一月 2024 16:23:48 +0800
Subject: [PATCH] bug fixed
---
src/socket/shm_socket.cpp | 99 ++++++++++++++++++-------------------------------
1 files changed, 37 insertions(+), 62 deletions(-)
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index dc6d752..a6b6f93 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -2,10 +2,12 @@
#include "socket_def.h"
#include "hashtable.h"
#include "logger_factory.h"
+#include "net_mod_socket_wrapper.h"
#include <map>
#include <cassert>
#include "bus_error.h"
#include "sole.h"
+#include "bh_api.h"
#include "shm_mm.h"
#include "key_def.h"
@@ -46,7 +48,7 @@
void *tmp_ptr = hashtable_get(hashtable, key);
if (tmp_ptr == NULL || tmp_ptr == (void *)1 ) {
- queue = new LockFreeQueue<shm_packet_t>(32);
+ queue = new LockFreeQueue<shm_packet_t>(LOCK_FREE_Q_DEFAULT_SIZE);
hashtable_put(hashtable, key, (void *)queue);
return queue;
} else if(force) {
@@ -68,7 +70,6 @@
}
queue = ( LockFreeQueue<shm_packet_t> *)tmp_ptr;
- // hashtable_unlock(hashtable);
return queue;
}
@@ -76,7 +77,6 @@
int s, type;
pthread_mutexattr_t mtxAttr;
- logger->debug("shm_socket_open\n");
// shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
shm_socket_t *sockt = new shm_socket_t;
sockt->socket_type = socket_type;
@@ -107,9 +107,6 @@
static int _shm_socket_close_(shm_socket_t *sockt) {
-
- int rv, i;
- hashtable_t *hashtable = mm_get_hashtable();
// if(sockt->key != 0) {
// auto it = shmQueueStMap->find(sockt->key);
@@ -119,18 +116,6 @@
// }
// }
- if(sockt->queue != NULL) {
- sockt->queue->close();
- for( i = 0; i < sockt->queue->size(); i++) {
- mm_free((*(sockt->queue))[i].buf);
- logger->info("======= %d free queue element buf\n", sockt->key);
- }
- sleep(1);
-
- hashtable_remove(hashtable, sockt->key);
- // sockt->queue = NULL;
- }
-
pthread_mutex_destroy(&(sockt->mutex) );
free(sockt);
return 0;
@@ -167,20 +152,8 @@
return 0;
}
-int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len) {
- strncpy(sockt->proc_id, buf, len > MAX_STR_LEN ? MAX_STR_LEN : len);
-
- return 0;
-}
-
int shm_socket_get_key(shm_socket_t *sockt){
return sockt->key;
-}
-
-int shm_socket_get_procid(shm_socket_t *sockt, char *buf, int len) {
- strncpy(buf, sockt->proc_id, len);
-
- return 0;
}
// 鐭繛鎺ユ柟寮忓彂閫�
@@ -208,7 +181,6 @@
int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf,
const int send_size, const int key, void **recv_buf,
int *recv_size, const struct timespec *timeout, int flags) {
- // return _shm_sendandrecv_uuid(sockt, send_buf, send_size, key, recv_buf, recv_size, timeout, flags);
return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, key, recv_buf, recv_size, timeout, flags);
}
@@ -231,7 +203,7 @@
if (rv != 0) {
if(rv == ETIMEDOUT){
- logger->debug("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT));
+ logger->error("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT));
return EBUS_TIMEOUT;
}
@@ -275,7 +247,7 @@
if (rv != 0) {
- logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
+ logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
return rv;
}
@@ -368,7 +340,6 @@
recvbufIter = sockt->recvbuf.find(uuid);
if(recvbufIter != sockt->recvbuf.end()) {
// 鍦ㄧ紦瀛橀噷鏌ュ埌浜哢UID鍖归厤鎴愬姛鐨�
-logger->debug("get from recvbuf: %s", uuid.c_str());
recvpak = recvbufIter->second;
sockt->recvbuf.erase(recvbufIter);
goto LABLE_SUC;
@@ -382,11 +353,10 @@
return EBUS_TIMEOUT;
}
- logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
+ logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
return rv;
}
-logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid);
if(strlen(recvpak.uuid) == 0) {
continue;
} else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof(recvpak.uuid)) == 0) {
@@ -421,14 +391,16 @@
const int send_size, const int key, void **recv_buf,
int *recv_size, const struct timespec *timeout, int flags) {
-
+ int data;
+ int timeout_ms;
+ char data_buf[MAX_STR_LEN] = { 0x00 };
int rv = 0, tryn = 16;
shm_packet_t sendpak;
shm_packet_t recvpak;
std::map<int, shm_packet_t>::iterator recvbufIter;
- // 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭�
shm_socket_t *tmp_socket = NULL;
-
+ hashtable_t *hashtable = mm_get_hashtable();
+
rv = pthread_once(&_once_, _create_threadlocal_socket_key_);
if (rv != 0) {
logger->error(rv, "shm_sendandrecv pthread_once");
@@ -438,17 +410,23 @@
tmp_socket = (shm_socket_t *)pthread_getspecific(_localthread_socket_key_);
if (tmp_socket == NULL)
{
- /* If first call from this thread, allocate buffer for thread, and save its location */
tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
-
+
+ tmp_socket->key = hashtable_alloc_key(hashtable);
+ data = inter_key_get();
+ timeout_ms = timeout->tv_sec * 1000 + 3000;
+ sprintf(data_buf, "%d, %d", data, tmp_socket->key);
+ if (socket_data_get() != NULL) {
+ net_mod_socket_reg(socket_data_get(), data_buf, strlen(data_buf), NULL, 0, timeout_ms, PROC_REG_BUF);
+ }
+
+ rv = pthread_setspecific(_localthread_socket_key_, tmp_socket);
+ if ( rv != 0) {
+ logger->error(rv, "shm_sendandrecv : pthread_setspecific");
+ exit(1);
+ }
}
-
- rv = pthread_setspecific(_localthread_socket_key_, tmp_socket);
- if ( rv != 0) {
- logger->error(rv, "shm_sendandrecv : pthread_setspecific");
- exit(1);
- }
-
+
sendpak.key = tmp_socket->key;
sendpak.size = send_size;
if(send_buf != NULL) {
@@ -458,6 +436,10 @@
rv = shm_sendpakto(tmp_socket, &sendpak, key, timeout, flags);
if(rv != 0) {
+ if(send_buf != NULL) {
+ mm_free(sendpak.buf);
+ }
+
return rv;
}
@@ -465,26 +447,25 @@
tryn--;
recvbufIter = tmp_socket->recvbuf2.find(key);
if(recvbufIter != tmp_socket->recvbuf2.end()) {
- // 鍦ㄧ紦瀛橀噷鏌ュ埌浜唊ey鍖归厤鎴愬姛鐨�
recvpak = recvbufIter->second;
- tmp_socket->recvbuf2.erase(recvbufIter);
+ tmp_socket->recvbuf2.erase(key);
goto LABLE_SUC;
}
rv = shm_recvpakfrom(tmp_socket, &recvpak, timeout, flags);
if (rv != 0) {
- logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv));
+ logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv));
return rv;
}
if (key == recvpak.key) {
- // 鍙戦�佷笌鎺ュ彈鐨刄UID鍖归厤鎴愬姛
+
goto LABLE_SUC;
+
} else {
- // 绛旈潪鎵�闂紝鏀惧埌缂撳瓨閲�
+
tmp_socket->recvbuf2.insert({recvpak.key, recvpak});
- exit(0);
continue;
}
}
@@ -581,6 +562,7 @@
if (sockt->key == 0) {
sockt->key = hashtable_alloc_key(hashtable);
}
+
sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
if(sockt->queue == NULL ) {
logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
@@ -639,13 +621,6 @@
sendpak->key = sockt->key;
}
rv = remoteQueue->push(*sendpak, timeout, flag);
-
- if(rv != 0) {
- mm_free(sendpak->buf);
- }
- if(rv == ETIMEDOUT) {
- return EBUS_TIMEOUT;
- }
return rv;
ERR_CLOSED:
@@ -753,7 +728,7 @@
count += strlen(ptr->int_info) + 1;
memcpy(dst + count, ptr->svr_info, strlen(ptr->svr_info) + 1);
count += strlen(ptr->svr_info) + 1;
-
+
*counter = count;
}
--
Gitblit v1.8.0