From 97428da8d339d3d0c4d00aa328d76a32c7858412 Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期五, 17 九月 2021 19:44:24 +0800
Subject: [PATCH] Fix the memory leakage.

---
 src/socket/bus_server_socket.cpp |  262 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 261 insertions(+), 1 deletions(-)

diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 80f7338..beb7148 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -6,6 +6,7 @@
 
 static Logger *logger = LoggerFactory::getLogger();
 
+list gLinkedList;
 void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
 	SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
 	SHMKeySet *subscripter_set;
@@ -296,12 +297,176 @@
   return dataBuf;
 }
 
+void list::Insert(int aData, int bData) 
+{
+  LinkNode *pHead = NULL; 
+  LinkNode *pNew = NULL;
+  LinkNode *pCur = NULL;
+ 
+  pNew = new(LinkNode);
+  pNew->data = aData;
+  pNew->data_fix = bData;
+  pNew->count = 0;
+  
+  pHead = head;
+  pCur = pHead;
+  if(pHead == NULL) {
+    head = pNew;
+  
+    pNew->next = NULL;
+ 
+  } else {
+    while(pCur->next != NULL) {
+      pCur = pCur->next;
+    }
+    
+    pCur->next = pNew;
+    pNew->next = NULL;
+  }
+}
+
+void list::Delete(int data)  
+{
+  LinkNode *pHead;
+  LinkNode *pCur;
+  LinkNode *pNext;
+  
+  pHead = head;
+  pCur = pHead;
+  if(pHead == NULL)
+    return;
+  
+  while((pCur != NULL) && (pCur->data == data)) {
+    
+    head = pCur->next;
+    
+    delete(pCur);
+    
+    pCur = head;
+    
+  }
+
+  while((pCur != NULL) && (pCur->next != NULL)) {
+    pNext = pCur->next;
+
+    if(pNext->data == data) {
+      pCur->next = pNext->next;
+      pCur = pNext->next;
+
+      delete(pNext);
+    } else {
+    
+      pCur = pNext;
+      
+    }
+  }
+}
+
+void list::dataSet(int data, int val)  
+{
+  LinkNode *pCur;
+  
+  pCur = head;
+  if(pCur == NULL)
+    return;
+  
+  while(pCur != NULL) {
+    
+    if(pCur->data == data) {
+      pCur->count = val;
+    }
+    
+    pCur = pCur->next;
+  }
+}
+
+int list::dataGet(int data)  
+{
+  LinkNode *pCur;
+  
+  pCur = head;
+  if(pCur == NULL)
+    return 0;
+  
+  while(pCur != NULL) {
+    
+    if(pCur->data == data) {
+      return pCur->count;
+    }
+    
+    pCur = pCur->next;
+  }
+  
+  return 0;
+}
+
+int list::dataFixGet(int data)  
+{
+  LinkNode *pCur;
+  
+  pCur = head;
+  if(pCur == NULL)
+    return 0;
+  
+  while(pCur != NULL) {
+    
+    if(pCur->data == data) {
+      return pCur->data_fix;
+    }
+    
+    pCur = pCur->next;
+  }
+  
+  return 0;
+}
+
+int list::NodeNum(void)
+{
+  int count = 0;
+  LinkNode *pCur = head;
+  
+  if (pCur == NULL) {
+    return 0;
+  }
+  
+  while(pCur != NULL) {
+   
+    ++count;
+    pCur = pCur->next;
+  }
+  
+  return count;
+}
+
+int list::nodeGet(int index)
+{
+  int count = 0;
+  LinkNode *pCur = head;
+  
+  if (pCur == NULL) {
+    return 0;
+  }
+  
+  while((pCur != NULL) && (count <= index)) {
+   
+    if (count == index) {
+      return pCur->data;
+    }
+    
+    ++count;
+    pCur = pCur->next;
+  }
+  
+  return 0;
+}
+
 void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
 {
   char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
   int count = 0;
   int i = 0;
   int len = 0;
+  int data1, data2;
   char *data_ptr;
   ProcInfo Data_stru;
   ProcZone::iterator proc_iter;
@@ -333,6 +498,16 @@
       
       memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
       count += strlen(buf + count) + 1;
+
+      memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1); 
+      count += strlen(buf + count) + 1;
+    
+      memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); 
+      count += strlen(buf + count) + 1;
+      
+      if (flag == PROC_REG) {
+        gLinkedList.Insert(key, atoi(Data_stru.int_info));
+      }
     }
 
     ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
@@ -362,6 +537,9 @@
 
       if ((proc_iter = proc->find(key)) != proc->end()) {
 
+        data1 = atoi((proc_iter->second).int_info);
+        data2 = atoi((proc_iter->second).svr_info);
+        BusServerSocket::_data_remove(data1, data2);
         len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
         strncpy(buf_temp, (proc_iter->second).proc_id, len);
         proc->erase(proc_iter);
@@ -504,7 +682,9 @@
 
     free(last_buf);
   } else if (flag == PROC_QUE_STCS) {
+
     SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
 
     strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
     if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
@@ -512,6 +692,9 @@
     
       for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { 
         count = *svr_proc_iter;
+        if ((proc_iter = proc->find(count)) != proc->end()) {
+          count = atoi((proc_iter->second).svr_info);
+        }
 
         break;
       }
@@ -666,7 +849,34 @@
 
     shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
 
+    free(last_buf);
   }
+}
+
+int BusServerSocket::get_data(int val) {
+  
+  ProcZone::iterator proc_iter;
+  ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+  if ((proc_iter = proc->find(val)) != proc->end()) {
+    return true;
+  }
+  
+  return false;
+  
+}
+
+int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \
+                          const struct timespec *timeout, const int flag) {
+  int ret;
+
+  ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag);
+
+  return ret;
+}
+
+void BusServerSocket::remove_proc(int val) {
+  BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG);
 }
 
 // 杩愯浠g悊
@@ -674,10 +884,13 @@
 	int size;
 	int key;
   int flag;
+  char buf_temp[MAX_STR_LEN] = { 0x00 };
 	char * action, *topic, *topics, *buf, *content;
 	size_t head_len;
-	char resp_buf[128];
 	bus_head_t head;
+    int val;
+    ProcDataZone::iterator proc_que_iter;
+    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
 
   int rv;
   char send_buf[512] = { 0x00 };
@@ -747,6 +960,16 @@
 
       }
         
+      if (flag == PROC_REG) {
+        memcpy(buf_temp, content, strlen(content) + 1);
+
+        if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
+
+          val = proc_que_iter->second;
+          _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG);
+        }
+      }
+
       _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
 
     }
@@ -771,3 +994,40 @@
 
 	return rv;
 }
+
+void BusServerSocket::_data_remove(int val1, int val2) {
+
+  int i;
+  LockFreeQueue<shm_packet_t> *queue = NULL;
+  hashtable_t *hashtable = mm_get_hashtable();
+
+  void *data_ptr1 = hashtable_get(hashtable, val1);
+  void *data_ptr2 = hashtable_get(hashtable, val2);
+  if (data_ptr1 != NULL) {
+    if (data_ptr1 != (void *)1) {
+      queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
+      queue->close();
+      for (i = 0; i < queue->size(); i++) {
+        mm_free((*queue)[i].buf);
+      }
+      sleep(1);
+    }
+
+    hashtable_remove(hashtable, val1);
+  }
+
+  if (data_ptr2 != NULL) {
+    if (data_ptr2 != (void *)1) {
+      queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
+      queue->close();
+      for (i = 0; i < queue->size(); i++) {
+        mm_free((*queue)[i].buf);
+      }
+      sleep(1);
+    }
+
+    hashtable_remove(hashtable, val2);
+  }
+
+}
+

--
Gitblit v1.8.0