From de80ed87b8339f23624642786698057a62bdf779 Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期二, 23 十一月 2021 11:25:48 +0800
Subject: [PATCH] Fix the communication failure issue when the registered applications exceeds the fixed amount.

---
 src/socket/bus_server_socket.cpp |   95 +++++++++++++++++++++++++++++++++++++++++------
 1 files changed, 82 insertions(+), 13 deletions(-)

diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 315c356..ef1321e 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -2,9 +2,11 @@
 #include "bus_server_socket.h"
 #include "shm_mod_socket.h"
 #include "shm_socket.h"
+#include "msg_mgr.h"
 #include "bus_error.h"
 
 static Logger *logger = LoggerFactory::getLogger();
+static pthread_mutex_t gMutex;
 
 list gLinkedList;
 void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
@@ -83,11 +85,13 @@
 int  BusServerSocket::start(){
   int rv;
 
-	topic_sub_map =	shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
- 
-	rv = _run_proxy_();
+  topic_sub_map =	shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
 
-	return rv;
+  pthread_mutex_init(&gMutex, NULL);
+
+  rv = _run_proxy_();
+
+  return rv;
 }
 
 
@@ -303,7 +307,7 @@
   LinkNode *pNew = NULL;
   LinkNode *pCur = NULL;
  
-  pNew = new(LinkNode);
+  pNew = (LinkNode *)malloc(sizeof(LinkNode));
   pNew->data = aData;
   pNew->data_fix = bData;
   pNew->count = 0;
@@ -340,7 +344,7 @@
     
     head = pCur->next;
     
-    delete(pCur);
+    free(pCur);
     
     pCur = head;
     
@@ -353,7 +357,7 @@
       pCur->next = pNext->next;
       pCur = pNext->next;
 
-      delete(pNext);
+      free(pNext);
     } else {
     
       pCur = pNext;
@@ -515,9 +519,11 @@
     ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
     ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY);
     if (flag == PROC_REG) {
+      pthread_mutex_lock(&gMutex);
       if ((proc_iter = proc->find(key)) == proc->end()) {
         proc->insert({key, Data_stru});
       }
+      pthread_mutex_unlock(&gMutex);
 
       if ((proc_part_iter = procPart->find(key)) == procPart->end()) {
         procPart->insert({key, Data_stru.proc_id});
@@ -536,6 +542,7 @@
         SvrSub_ele->erase(key);
       }
 
+      pthread_mutex_lock(&gMutex);
       if ((proc_iter = proc->find(key)) != proc->end()) {
 
         data1 = atoi((proc_iter->second).int_info);
@@ -545,9 +552,10 @@
         BusServerSocket::_data_remove(key);
         len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
         strncpy(buf_temp, (proc_iter->second).proc_id, len);
-        proc->erase(proc_iter);
+        proc->erase(key);
 
       }
+      pthread_mutex_unlock(&gMutex);
 
       if ((proc_part_iter = procPart->find(key)) != procPart->end()) {
 
@@ -559,7 +567,10 @@
         procQuePart->erase(buf_temp);
       }
 
+      BusServerSocket::buf_data_remove(key);
+      find_mm_data(key);
     }
+
   } else if (flag == PROC_REG_TCS) {
     ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
     SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
@@ -709,7 +720,7 @@
     sprintf(data_buf, "%d", count);
     shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG);
 
-  } else {
+  } else if (flag == PROC_QUE_ATCS) {
 
     int val;
     int temp = 0;
@@ -853,6 +864,17 @@
     shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
 
     free(last_buf);
+  } else {
+
+    char *ptr = NULL;
+    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+
+    data1 = atoi(buf_temp);
+    ptr = strstr(buf_temp, STR_MAGIC);
+    if (ptr != NULL) {
+      data2 = atoi(ptr + 1);
+    }
+    BusServerSocket::buf_data_set(data1, data2); 
   }
 }
 
@@ -861,9 +883,12 @@
   ProcZone::iterator proc_iter;
   ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
 
+  pthread_mutex_lock(&gMutex);
   if ((proc_iter = proc->find(val)) != proc->end()) {
+    pthread_mutex_unlock(&gMutex);
     return true;
   }
+  pthread_mutex_unlock(&gMutex);
   
   return false;
   
@@ -888,7 +913,7 @@
 	int key;
   int flag;
   char buf_temp[MAX_STR_LEN] = { 0x00 };
-	char * action, *topic, *topics, *buf, *content;
+	char *action, *topic, *topics, *buf, *content;
 	size_t head_len;
 	bus_head_t head;
     int val;
@@ -935,7 +960,8 @@
 		} 
     else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
             || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
-            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
+            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \
+            || (strcmp(action, "bufreg") == 0)) {
       content = topics + head.topic_size;
       if (strcmp(action, "reg") == 0) {
         
@@ -957,15 +983,19 @@
         
         flag = PROC_QUE_STCS; 
 
-      } else {
+      } else if (strcmp(action, "atcsque") == 0) {
         
         flag = PROC_QUE_ATCS;
+
+      } else {
+        
+        flag = PROC_REG_BUF;
 
       }
         
       if (flag == PROC_REG) {
         memcpy(buf_temp, content, strlen(content) + 1);
-
+        
         if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
 
           val = proc_que_iter->second;
@@ -996,6 +1026,7 @@
   hashtable_t *hashtable = mm_get_hashtable();
 
   void *data_ptr = hashtable_get(hashtable, val);
+
   if (data_ptr != NULL) {
     if (data_ptr != (void *)1) {
       queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
@@ -1011,3 +1042,41 @@
 
 }
 
+void BusServerSocket::buf_data_set(int data, int val) {
+  recvbuf_val *val_buf;
+  recvbuf_data::iterator data_iter;
+  recvbuf_val::iterator val_iter;
+
+  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
+    val_buf = data_iter->second;
+  } else {
+    void *set_ptr = mm_malloc(sizeof(recvbuf_val));
+
+    val_buf = new(set_ptr) recvbuf_val;
+    recvBuf_data.insert({data, val_buf}); 
+  }
+
+  val_buf->insert(val);
+}
+
+void BusServerSocket::buf_data_remove(int data) {
+
+  int val;
+  recvbuf_val *val_buf;
+  recvbuf_data::iterator data_iter;
+  recvbuf_val::iterator val_iter;
+
+  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
+    
+    val_buf = data_iter->second;
+    for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) {
+      val = *val_iter;
+
+      BusServerSocket::_data_remove(val);
+    }
+
+    recvBuf_data.erase(data);
+  }
+}
+
+

--
Gitblit v1.8.0