From 82b028cf63953d8080b63d85468eae488d212194 Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期四, 23 九月 2021 14:30:07 +0800
Subject: [PATCH] Fix the data parsing when in multiple threads.

---
 src/bus_proxy_start.cpp            |    3 -
 src/socket/shm_mod_socket.h        |    1 
 src/socket/bus_server_socket.cpp   |   61 ++++++++++--------------------
 src/net/net_mod_socket_wrapper.cpp |    5 --
 src/net/net_mod_socket.h           |    1 
 src/socket/shm_socket.h            |    4 -
 src/socket/bus_server_socket.h     |    2 
 src/bh_api.cpp                     |   16 +++++--
 src/socket/shm_mod_socket.cpp      |    4 --
 src/net/net_mod_socket.cpp         |    4 --
 src/shm/hashtable.cpp              |    1 
 src/socket/shm_socket.cpp          |   16 -------
 12 files changed, 34 insertions(+), 84 deletions(-)

diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 312b203..32e53ef 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -530,9 +530,10 @@
     if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
       mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
     }
-    
+  
+    Proc_ptr = &(ptr->procData);
     for(int i = 0; i < mtr_list_num; i++) {
-      mtr_list[i].proc_id = ptr->procData.proc_id;
+      mtr_list[i].proc_id = (Proc_ptr + i)->proc_id;
       mtr_list[i].mq_id = ID_RSV;
       mtr_list[i].abs_addr = ABS_ID_RSV;
       mtr_list[i].ip = "127.0.0.1";
@@ -1162,6 +1163,7 @@
   int sec, nsec;
   std::string MsgID;
   int timeout_ms = 3000;
+  char data_buf[MAX_STR_LEN] = { 0x00 };
   char buf_temp[MAX_STR_LEN] = { 0x00 };
   char *topics_buf = NULL;
   
@@ -1225,7 +1227,9 @@
   rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
   if (rv == 0) {
 
-    val = atoi((char *)buf);
+    len = size > (sizeof(data_buf) - 1) ? (sizeof(data_buf) - 1) : size;
+    memcpy(data_buf, (char *)buf, len);
+    val = atoi((char *)data_buf);
 
     free(buf);
 
@@ -1316,6 +1320,7 @@
   net_node_t node;
   int node_size;  
   int recv_arr_size;
+  char data_buf[MAX_STR_LEN] = { 0x00 };
   net_mod_recv_msg_t *recv_arr;
   net_mod_err_t *errarr;
   int errarr_size = 0;
@@ -1389,7 +1394,9 @@
   rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
   if (rv == 0) {
     
-    val = atoi((char *)buf);
+    len = size > (sizeof(data_buf) - 1) ? (sizeof(data_buf) - 1) : size;
+    memcpy(data_buf, (char *)buf, len);
+    val = atoi((char *)data_buf);
 
     free(buf);
 
@@ -1401,7 +1408,6 @@
       len += strlen(_input1.data);
 #endif
 
-      data = net_mod_socket_svr_get(gNetmod_socket);
       topics_buf = (char *)malloc(len);
       if (topics_buf == NULL) {
         
diff --git a/src/bus_proxy_start.cpp b/src/bus_proxy_start.cpp
index c3104a9..e2e4955 100644
--- a/src/bus_proxy_start.cpp
+++ b/src/bus_proxy_start.cpp
@@ -45,9 +45,6 @@
   return NULL;
 }
 
-
-
-
 void *svr_start(void *skptr) {
   int port = *(int *)skptr;
 
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index 2f5ce73..b3aa9b0 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -46,10 +46,6 @@
   return shmModSocket.force_bind(key);
 }
 
-int NetModSocket::bind_proc_id(char *buf, int len) {
-  return shmModSocket.bind_proc_id(buf, len);
-}
-
 int NetModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) {
   
   return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag);
diff --git a/src/net/net_mod_socket.h b/src/net/net_mod_socket.h
index 9d9af97..0cb2fd0 100644
--- a/src/net/net_mod_socket.h
+++ b/src/net/net_mod_socket.h
@@ -120,7 +120,6 @@
   */
   int force_bind( int key);
 
-  int bind_proc_id(char *buf, int len);
   int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
   
   /**
diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp
index 5233635..479b0d4 100644
--- a/src/net/net_mod_socket_wrapper.cpp
+++ b/src/net/net_mod_socket_wrapper.cpp
@@ -103,11 +103,6 @@
 	return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, -1);
 }
 
-int net_mod_socket_bind_proc_id(void * _socket, char *proc_id, int len){
-  NetModSocket *sockt = (NetModSocket *)_socket;
-  return sockt->bind_proc_id(proc_id, len);
-}
-
 void net_mod_socket_int_set(void * _socket, int data) {
   NetModSocket *sockt = (NetModSocket *)_socket;
   sockt->int_set(data);
diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp
index c4a81fc..8593cca 100755
--- a/src/shm/hashtable.cpp
+++ b/src/shm/hashtable.cpp
@@ -178,7 +178,6 @@
     goto suc;
   }
   val = _hashtable_get(hashtable, key);
-  // val = 1鏄痑llockey鐨勬儏鍐�
   if(val != NULL && val != (void *)1) 
     goto fail;
 
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index beb7148..2d552da 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -539,7 +539,9 @@
 
         data1 = atoi((proc_iter->second).int_info);
         data2 = atoi((proc_iter->second).svr_info);
-        BusServerSocket::_data_remove(data1, data2);
+        BusServerSocket::_data_remove(data1);
+        BusServerSocket::_data_remove(data2);
+        BusServerSocket::_data_remove(key);
         len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
         strncpy(buf_temp, (proc_iter->second).proc_id, len);
         proc->erase(proc_iter);
@@ -892,10 +894,10 @@
     ProcDataZone::iterator proc_que_iter;
     ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
 
-  int rv;
-  char send_buf[512] = { 0x00 };
+    int rv;
+    char send_buf[512] = { 0x00 };
 
-  const char *topic_delim = ",";
+    const char *topic_delim = ",";
 	while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
 		head = ShmModSocket::decode_bus_head(buf);
 		topics = buf + BUS_HEAD_SIZE;
@@ -973,39 +975,29 @@
       _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
 
     }
-		else if (strncmp(buf, "request", strlen("request")) == 0) {
-      sprintf(send_buf, "%4d", key);
-      strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
-      
-      rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key);
-      if(rv != 0) {
-        logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n");
-      }
-    }
     else if(strcmp(action, "stop") == 0) {
-			free(buf);
-			break;
-		} else {
-			logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
-		}
-		free(buf);
-	}
+      free(buf);
+      break;
+    } else {
+      logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
+    }
+    free(buf);
+  }
 
 
-	return rv;
+  return rv;
 }
 
-void BusServerSocket::_data_remove(int val1, int val2) {
+void BusServerSocket::_data_remove(int val) {
 
   int i;
   LockFreeQueue<shm_packet_t> *queue = NULL;
   hashtable_t *hashtable = mm_get_hashtable();
 
-  void *data_ptr1 = hashtable_get(hashtable, val1);
-  void *data_ptr2 = hashtable_get(hashtable, val2);
-  if (data_ptr1 != NULL) {
-    if (data_ptr1 != (void *)1) {
-      queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
+  void *data_ptr = hashtable_get(hashtable, val);
+  if (data_ptr != NULL) {
+    if (data_ptr != (void *)1) {
+      queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
       queue->close();
       for (i = 0; i < queue->size(); i++) {
         mm_free((*queue)[i].buf);
@@ -1013,20 +1005,7 @@
       sleep(1);
     }
 
-    hashtable_remove(hashtable, val1);
-  }
-
-  if (data_ptr2 != NULL) {
-    if (data_ptr2 != (void *)1) {
-      queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
-      queue->close();
-      for (i = 0; i < queue->size(); i++) {
-        mm_free((*queue)[i].buf);
-      }
-      sleep(1);
-    }
-
-    hashtable_remove(hashtable, val2);
+    hashtable_remove(hashtable, val);
   }
 
 }
diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h
index ba6ebe8..ec0e42f 100644
--- a/src/socket/bus_server_socket.h
+++ b/src/socket/bus_server_socket.h
@@ -121,7 +121,7 @@
 	 */
 	int get_key() ;
 
-  void _data_remove(int val1, int val2);
+  void _data_remove(int val);
 
 };
 
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 7562d56..6139d34 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -38,10 +38,6 @@
 	return shm_socket_force_bind(shm_socket, key);
 }
 
-int ShmModSocket::bind_proc_id(char *buf, int len) {
-  return shm_socket_bind_proc_id(shm_socket, buf, len);
-}
-
 int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag)
 {
   int ret;
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index 5e234bf..c361300 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -62,7 +62,6 @@
 	*/
 	int force_bind(int key);
 
-  int bind_proc_id(char *buf, int len);
   int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
 	
   int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0);
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 6705b96..1912772 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -166,20 +166,8 @@
   return 0;
 }
 
-int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len) {
-  strncpy(sockt->proc_id, buf, len > MAX_STR_LEN ? MAX_STR_LEN : len);
-
-  return 0;
-}
-
 int shm_socket_get_key(shm_socket_t *sockt){
   return sockt->key;
-}
-
-int shm_socket_get_procid(shm_socket_t *sockt, char *buf, int len) {
-  strncpy(buf, sockt->proc_id, len);
-
-  return 0;
 }
 
 // 鐭繛鎺ユ柟寮忓彂閫�
@@ -462,9 +450,8 @@
     tryn--;
     recvbufIter = tmp_socket->recvbuf2.find(key);
     if(recvbufIter != tmp_socket->recvbuf2.end()) {
-      // 鍦ㄧ紦瀛橀噷鏌ュ埌浜唊ey鍖归厤鎴愬姛鐨�
       recvpak = recvbufIter->second;
-      tmp_socket->recvbuf2.erase(recvbufIter);
+      tmp_socket->recvbuf2.erase(key);
       goto LABLE_SUC;
     }
 
@@ -481,7 +468,6 @@
     } else {
       // 绛旈潪鎵�闂紝鏀惧埌缂撳瓨閲�
       tmp_socket->recvbuf2.insert({recvpak.key, recvpak});
-      exit(0);
       continue;
     }
   }
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index 2b50a11..a900e4e 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -23,7 +23,7 @@
 
 	size_t size;
 	void * buf;
-	char uuid[64];
+	char uuid[1];
 	int action;
 
 } shm_packet_t;
@@ -34,7 +34,6 @@
 typedef struct shm_socket_t {
 	shm_socket_type_t socket_type;
 	int key;
-  char proc_id[MAX_STR_LEN];
 	bool force_bind;
 	pthread_mutex_t mutex;
 
@@ -62,7 +61,6 @@
 
 int shm_socket_force_bind(shm_socket_t * socket, int key) ;
 
-int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len);
 /**
  * @flags : BUS_NOWAIT_FLAG
  */

--
Gitblit v1.8.0