From dc01e4cbb01e96d19b470a366bbe648d426ed171 Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期六, 11 九月 2021 10:06:15 +0800
Subject: [PATCH] Add topics sub and request support.

---
 src/socket/bus_server_socket.cpp |   53 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 53 insertions(+), 0 deletions(-)

diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 8b022e1..d5e757d 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -302,6 +302,7 @@
   int count = 0;
   int i = 0;
   int len = 0;
+  int data1, data2;
   char *data_ptr;
   ProcInfo Data_stru;
   ProcZone::iterator proc_iter;
@@ -333,6 +334,13 @@
       
       memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
       count += strlen(buf + count) + 1;
+
+      memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1); 
+      count += strlen(buf + count) + 1;
+    
+      memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); 
+      count += strlen(buf + count) + 1;
+
     }
 
     ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
@@ -362,6 +370,9 @@
 
       if ((proc_iter = proc->find(key)) != proc->end()) {
 
+        data1 = atoi((proc_iter->second).int_info);
+        data2 = atoi((proc_iter->second).svr_info);
+        BusServerSocket::_data_remove(data1, data2);
         len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
         strncpy(buf_temp, (proc_iter->second).proc_id, len);
         proc->erase(proc_iter);
@@ -504,7 +515,9 @@
 
     free(last_buf);
   } else if (flag == PROC_QUE_STCS) {
+
     SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
 
     strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
     if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
@@ -512,6 +525,9 @@
     
       for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { 
         count = *svr_proc_iter;
+        if ((proc_iter = proc->find(count)) != proc->end()) {
+          count = atoi((proc_iter->second).svr_info);
+        }
 
         break;
       }
@@ -770,3 +786,40 @@
 
 	return rv;
 }
+
+void BusServerSocket::_data_remove(int val1, int val2) {
+
+  int i;
+  LockFreeQueue<shm_packet_t> *queue = NULL;
+  hashtable_t *hashtable = mm_get_hashtable();
+
+  void *data_ptr1 = hashtable_get(hashtable, val1);
+  void *data_ptr2 = hashtable_get(hashtable, val2);
+  if (data_ptr1 != NULL) {
+    if (data_ptr1 != (void *)1) {
+      queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
+      queue->close();
+      for (i = 0; i < queue->size(); i++) {
+        mm_free((*queue)[i].buf);
+      }
+      sleep(1);
+    }
+
+    hashtable_remove(hashtable, val1);
+  }
+
+  if (data_ptr2 != NULL) {
+    if (data_ptr2 != (void *)1) {
+      queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
+      queue->close();
+      for (i = 0; i < queue->size(); i++) {
+        mm_free((*queue)[i].buf);
+      }
+      sleep(1);
+    }
+
+    hashtable_remove(hashtable, val2);
+  }
+
+}
+

--
Gitblit v1.8.0