From 73689afc09ce346f9eb00e02faf7f242e55dc7ee Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期四, 09 十二月 2021 19:33:00 +0800
Subject: [PATCH] Add the sync to fix the resource clear issue.

---
 src/socket/bus_server_socket.cpp |   29 ++++++++++++++++++++++-------
 1 files changed, 22 insertions(+), 7 deletions(-)

diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 0a44949..fc53b0b 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -6,6 +6,7 @@
 #include "bus_error.h"
 
 static Logger *logger = LoggerFactory::getLogger();
+static pthread_mutex_t gMutex;
 
 list gLinkedList;
 void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
@@ -84,11 +85,13 @@
 int  BusServerSocket::start(){
   int rv;
 
-	topic_sub_map =	shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
- 
-	rv = _run_proxy_();
+  topic_sub_map =	shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
 
-	return rv;
+  pthread_mutex_init(&gMutex, NULL);
+
+  rv = _run_proxy_();
+
+  return rv;
 }
 
 
@@ -201,7 +204,7 @@
 	int rv;
 	struct timespec timeout = {1,0};
 
-	if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
+    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
 
 		subscripter_set = map_iter->second;
 		for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
@@ -516,9 +519,11 @@
     ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
     ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY);
     if (flag == PROC_REG) {
+      pthread_mutex_lock(&gMutex);
       if ((proc_iter = proc->find(key)) == proc->end()) {
         proc->insert({key, Data_stru});
       }
+      pthread_mutex_unlock(&gMutex);
 
       if ((proc_part_iter = procPart->find(key)) == procPart->end()) {
         procPart->insert({key, Data_stru.proc_id});
@@ -537,6 +542,7 @@
         SvrSub_ele->erase(key);
       }
 
+      pthread_mutex_lock(&gMutex);
       if ((proc_iter = proc->find(key)) != proc->end()) {
 
         data1 = atoi((proc_iter->second).int_info);
@@ -546,9 +552,10 @@
         BusServerSocket::_data_remove(key);
         len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
         strncpy(buf_temp, (proc_iter->second).proc_id, len);
-        proc->erase(proc_iter);
+        proc->erase(key);
 
       }
+      pthread_mutex_unlock(&gMutex);
 
       if ((proc_part_iter = procPart->find(key)) != procPart->end()) {
 
@@ -560,7 +567,10 @@
         procQuePart->erase(buf_temp);
       }
 
+      pthread_mutex_lock(&gMutex);
       BusServerSocket::buf_data_remove(key);
+      pthread_mutex_unlock(&gMutex);
+
       find_mm_data(key);
     }
 
@@ -580,6 +590,7 @@
     strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); 
     data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
     while(data_ptr) {
+      data_ptr = trim(data_ptr, 0);
       TcsSub_ele->insert(data_ptr);
       if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) {
         SvrSub_ele = svr_tcs_iter->second;
@@ -609,6 +620,7 @@
     strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
     data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
     while(data_ptr) {
+      data_ptr = trim(data_ptr, 0);
       ret = Qurey_object(data_ptr, &len);
       if (ret != NULL) {
     
@@ -694,7 +706,7 @@
     ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
 
     strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
-    if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
+    if ((svr_tcs_iter = SvrData->find(trim(buf_temp, 0))) != SvrData->end()) {
       SvrSub_ele = svr_tcs_iter->second;
     
       for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { 
@@ -876,9 +888,12 @@
   ProcZone::iterator proc_iter;
   ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
 
+  pthread_mutex_lock(&gMutex);
   if ((proc_iter = proc->find(val)) != proc->end()) {
+    pthread_mutex_unlock(&gMutex);
     return true;
   }
+  pthread_mutex_unlock(&gMutex);
   
   return false;
   

--
Gitblit v1.8.0