From 82b028cf63953d8080b63d85468eae488d212194 Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期四, 23 九月 2021 14:30:07 +0800
Subject: [PATCH] Fix the data parsing when in multiple threads.

---
 src/socket/bus_server_socket.cpp |   61 ++++++++++--------------------
 1 files changed, 20 insertions(+), 41 deletions(-)

diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index beb7148..2d552da 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -539,7 +539,9 @@
 
         data1 = atoi((proc_iter->second).int_info);
         data2 = atoi((proc_iter->second).svr_info);
-        BusServerSocket::_data_remove(data1, data2);
+        BusServerSocket::_data_remove(data1);
+        BusServerSocket::_data_remove(data2);
+        BusServerSocket::_data_remove(key);
         len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
         strncpy(buf_temp, (proc_iter->second).proc_id, len);
         proc->erase(proc_iter);
@@ -892,10 +894,10 @@
     ProcDataZone::iterator proc_que_iter;
     ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
 
-  int rv;
-  char send_buf[512] = { 0x00 };
+    int rv;
+    char send_buf[512] = { 0x00 };
 
-  const char *topic_delim = ",";
+    const char *topic_delim = ",";
 	while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
 		head = ShmModSocket::decode_bus_head(buf);
 		topics = buf + BUS_HEAD_SIZE;
@@ -973,39 +975,29 @@
       _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
 
     }
-		else if (strncmp(buf, "request", strlen("request")) == 0) {
-      sprintf(send_buf, "%4d", key);
-      strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
-      
-      rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key);
-      if(rv != 0) {
-        logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n");
-      }
-    }
     else if(strcmp(action, "stop") == 0) {
-			free(buf);
-			break;
-		} else {
-			logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
-		}
-		free(buf);
-	}
+      free(buf);
+      break;
+    } else {
+      logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
+    }
+    free(buf);
+  }
 
 
-	return rv;
+  return rv;
 }
 
-void BusServerSocket::_data_remove(int val1, int val2) {
+void BusServerSocket::_data_remove(int val) {
 
   int i;
   LockFreeQueue<shm_packet_t> *queue = NULL;
   hashtable_t *hashtable = mm_get_hashtable();
 
-  void *data_ptr1 = hashtable_get(hashtable, val1);
-  void *data_ptr2 = hashtable_get(hashtable, val2);
-  if (data_ptr1 != NULL) {
-    if (data_ptr1 != (void *)1) {
-      queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
+  void *data_ptr = hashtable_get(hashtable, val);
+  if (data_ptr != NULL) {
+    if (data_ptr != (void *)1) {
+      queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
       queue->close();
       for (i = 0; i < queue->size(); i++) {
         mm_free((*queue)[i].buf);
@@ -1013,20 +1005,7 @@
       sleep(1);
     }
 
-    hashtable_remove(hashtable, val1);
-  }
-
-  if (data_ptr2 != NULL) {
-    if (data_ptr2 != (void *)1) {
-      queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
-      queue->close();
-      for (i = 0; i < queue->size(); i++) {
-        mm_free((*queue)[i].buf);
-      }
-      sleep(1);
-    }
-
-    hashtable_remove(hashtable, val2);
+    hashtable_remove(hashtable, val);
   }
 
 }

--
Gitblit v1.8.0