From 82b028cf63953d8080b63d85468eae488d212194 Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期四, 23 九月 2021 14:30:07 +0800
Subject: [PATCH] Fix the data parsing when in multiple threads.
---
src/bus_proxy_start.cpp | 3 -
src/socket/shm_mod_socket.h | 1
src/socket/bus_server_socket.cpp | 61 ++++++++++--------------------
src/net/net_mod_socket_wrapper.cpp | 5 --
src/net/net_mod_socket.h | 1
src/socket/shm_socket.h | 4 -
src/socket/bus_server_socket.h | 2
src/bh_api.cpp | 16 +++++--
src/socket/shm_mod_socket.cpp | 4 --
src/net/net_mod_socket.cpp | 4 --
src/shm/hashtable.cpp | 1
src/socket/shm_socket.cpp | 16 -------
12 files changed, 34 insertions(+), 84 deletions(-)
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 312b203..32e53ef 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -530,9 +530,10 @@
if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
}
-
+
+ Proc_ptr = &(ptr->procData);
for(int i = 0; i < mtr_list_num; i++) {
- mtr_list[i].proc_id = ptr->procData.proc_id;
+ mtr_list[i].proc_id = (Proc_ptr + i)->proc_id;
mtr_list[i].mq_id = ID_RSV;
mtr_list[i].abs_addr = ABS_ID_RSV;
mtr_list[i].ip = "127.0.0.1";
@@ -1162,6 +1163,7 @@
int sec, nsec;
std::string MsgID;
int timeout_ms = 3000;
+ char data_buf[MAX_STR_LEN] = { 0x00 };
char buf_temp[MAX_STR_LEN] = { 0x00 };
char *topics_buf = NULL;
@@ -1225,7 +1227,9 @@
rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
if (rv == 0) {
- val = atoi((char *)buf);
+ len = size > (sizeof(data_buf) - 1) ? (sizeof(data_buf) - 1) : size;
+ memcpy(data_buf, (char *)buf, len);
+ val = atoi((char *)data_buf);
free(buf);
@@ -1316,6 +1320,7 @@
net_node_t node;
int node_size;
int recv_arr_size;
+ char data_buf[MAX_STR_LEN] = { 0x00 };
net_mod_recv_msg_t *recv_arr;
net_mod_err_t *errarr;
int errarr_size = 0;
@@ -1389,7 +1394,9 @@
rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
if (rv == 0) {
- val = atoi((char *)buf);
+ len = size > (sizeof(data_buf) - 1) ? (sizeof(data_buf) - 1) : size;
+ memcpy(data_buf, (char *)buf, len);
+ val = atoi((char *)data_buf);
free(buf);
@@ -1401,7 +1408,6 @@
len += strlen(_input1.data);
#endif
- data = net_mod_socket_svr_get(gNetmod_socket);
topics_buf = (char *)malloc(len);
if (topics_buf == NULL) {
diff --git a/src/bus_proxy_start.cpp b/src/bus_proxy_start.cpp
index c3104a9..e2e4955 100644
--- a/src/bus_proxy_start.cpp
+++ b/src/bus_proxy_start.cpp
@@ -45,9 +45,6 @@
return NULL;
}
-
-
-
void *svr_start(void *skptr) {
int port = *(int *)skptr;
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index 2f5ce73..b3aa9b0 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -46,10 +46,6 @@
return shmModSocket.force_bind(key);
}
-int NetModSocket::bind_proc_id(char *buf, int len) {
- return shmModSocket.bind_proc_id(buf, len);
-}
-
int NetModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) {
return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag);
diff --git a/src/net/net_mod_socket.h b/src/net/net_mod_socket.h
index 9d9af97..0cb2fd0 100644
--- a/src/net/net_mod_socket.h
+++ b/src/net/net_mod_socket.h
@@ -120,7 +120,6 @@
*/
int force_bind( int key);
- int bind_proc_id(char *buf, int len);
int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
/**
diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp
index 5233635..479b0d4 100644
--- a/src/net/net_mod_socket_wrapper.cpp
+++ b/src/net/net_mod_socket_wrapper.cpp
@@ -103,11 +103,6 @@
return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, -1);
}
-int net_mod_socket_bind_proc_id(void * _socket, char *proc_id, int len){
- NetModSocket *sockt = (NetModSocket *)_socket;
- return sockt->bind_proc_id(proc_id, len);
-}
-
void net_mod_socket_int_set(void * _socket, int data) {
NetModSocket *sockt = (NetModSocket *)_socket;
sockt->int_set(data);
diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp
index c4a81fc..8593cca 100755
--- a/src/shm/hashtable.cpp
+++ b/src/shm/hashtable.cpp
@@ -178,7 +178,6 @@
goto suc;
}
val = _hashtable_get(hashtable, key);
- // val = 1鏄痑llockey鐨勬儏鍐�
if(val != NULL && val != (void *)1)
goto fail;
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index beb7148..2d552da 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -539,7 +539,9 @@
data1 = atoi((proc_iter->second).int_info);
data2 = atoi((proc_iter->second).svr_info);
- BusServerSocket::_data_remove(data1, data2);
+ BusServerSocket::_data_remove(data1);
+ BusServerSocket::_data_remove(data2);
+ BusServerSocket::_data_remove(key);
len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
strncpy(buf_temp, (proc_iter->second).proc_id, len);
proc->erase(proc_iter);
@@ -892,10 +894,10 @@
ProcDataZone::iterator proc_que_iter;
ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
- int rv;
- char send_buf[512] = { 0x00 };
+ int rv;
+ char send_buf[512] = { 0x00 };
- const char *topic_delim = ",";
+ const char *topic_delim = ",";
while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
head = ShmModSocket::decode_bus_head(buf);
topics = buf + BUS_HEAD_SIZE;
@@ -973,39 +975,29 @@
_proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
}
- else if (strncmp(buf, "request", strlen("request")) == 0) {
- sprintf(send_buf, "%4d", key);
- strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
-
- rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key);
- if(rv != 0) {
- logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n");
- }
- }
else if(strcmp(action, "stop") == 0) {
- free(buf);
- break;
- } else {
- logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
- }
- free(buf);
- }
+ free(buf);
+ break;
+ } else {
+ logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
+ }
+ free(buf);
+ }
- return rv;
+ return rv;
}
-void BusServerSocket::_data_remove(int val1, int val2) {
+void BusServerSocket::_data_remove(int val) {
int i;
LockFreeQueue<shm_packet_t> *queue = NULL;
hashtable_t *hashtable = mm_get_hashtable();
- void *data_ptr1 = hashtable_get(hashtable, val1);
- void *data_ptr2 = hashtable_get(hashtable, val2);
- if (data_ptr1 != NULL) {
- if (data_ptr1 != (void *)1) {
- queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
+ void *data_ptr = hashtable_get(hashtable, val);
+ if (data_ptr != NULL) {
+ if (data_ptr != (void *)1) {
+ queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
queue->close();
for (i = 0; i < queue->size(); i++) {
mm_free((*queue)[i].buf);
@@ -1013,20 +1005,7 @@
sleep(1);
}
- hashtable_remove(hashtable, val1);
- }
-
- if (data_ptr2 != NULL) {
- if (data_ptr2 != (void *)1) {
- queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
- queue->close();
- for (i = 0; i < queue->size(); i++) {
- mm_free((*queue)[i].buf);
- }
- sleep(1);
- }
-
- hashtable_remove(hashtable, val2);
+ hashtable_remove(hashtable, val);
}
}
diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h
index ba6ebe8..ec0e42f 100644
--- a/src/socket/bus_server_socket.h
+++ b/src/socket/bus_server_socket.h
@@ -121,7 +121,7 @@
*/
int get_key() ;
- void _data_remove(int val1, int val2);
+ void _data_remove(int val);
};
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 7562d56..6139d34 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -38,10 +38,6 @@
return shm_socket_force_bind(shm_socket, key);
}
-int ShmModSocket::bind_proc_id(char *buf, int len) {
- return shm_socket_bind_proc_id(shm_socket, buf, len);
-}
-
int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag)
{
int ret;
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index 5e234bf..c361300 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -62,7 +62,6 @@
*/
int force_bind(int key);
- int bind_proc_id(char *buf, int len);
int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0);
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 6705b96..1912772 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -166,20 +166,8 @@
return 0;
}
-int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len) {
- strncpy(sockt->proc_id, buf, len > MAX_STR_LEN ? MAX_STR_LEN : len);
-
- return 0;
-}
-
int shm_socket_get_key(shm_socket_t *sockt){
return sockt->key;
-}
-
-int shm_socket_get_procid(shm_socket_t *sockt, char *buf, int len) {
- strncpy(buf, sockt->proc_id, len);
-
- return 0;
}
// 鐭繛鎺ユ柟寮忓彂閫�
@@ -462,9 +450,8 @@
tryn--;
recvbufIter = tmp_socket->recvbuf2.find(key);
if(recvbufIter != tmp_socket->recvbuf2.end()) {
- // 鍦ㄧ紦瀛橀噷鏌ュ埌浜唊ey鍖归厤鎴愬姛鐨�
recvpak = recvbufIter->second;
- tmp_socket->recvbuf2.erase(recvbufIter);
+ tmp_socket->recvbuf2.erase(key);
goto LABLE_SUC;
}
@@ -481,7 +468,6 @@
} else {
// 绛旈潪鎵�闂紝鏀惧埌缂撳瓨閲�
tmp_socket->recvbuf2.insert({recvpak.key, recvpak});
- exit(0);
continue;
}
}
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index 2b50a11..a900e4e 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -23,7 +23,7 @@
size_t size;
void * buf;
- char uuid[64];
+ char uuid[1];
int action;
} shm_packet_t;
@@ -34,7 +34,6 @@
typedef struct shm_socket_t {
shm_socket_type_t socket_type;
int key;
- char proc_id[MAX_STR_LEN];
bool force_bind;
pthread_mutex_t mutex;
@@ -62,7 +61,6 @@
int shm_socket_force_bind(shm_socket_t * socket, int key) ;
-int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len);
/**
* @flags : BUS_NOWAIT_FLAG
*/
--
Gitblit v1.8.0