From 73689afc09ce346f9eb00e02faf7f242e55dc7ee Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期四, 09 十二月 2021 19:33:00 +0800
Subject: [PATCH] Add the sync to fix the resource clear issue.

---
 src/socket/bus_server_socket.cpp |  374 +++++++++++++++++++++++++++++++++++++++++++++--------
 1 files changed, 318 insertions(+), 56 deletions(-)

diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index d5e757d..fc53b0b 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -2,10 +2,13 @@
 #include "bus_server_socket.h"
 #include "shm_mod_socket.h"
 #include "shm_socket.h"
+#include "msg_mgr.h"
 #include "bus_error.h"
 
 static Logger *logger = LoggerFactory::getLogger();
+static pthread_mutex_t gMutex;
 
+list gLinkedList;
 void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
 	SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
 	SHMKeySet *subscripter_set;
@@ -82,11 +85,13 @@
 int  BusServerSocket::start(){
   int rv;
 
-	topic_sub_map =	shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
- 
-	rv = _run_proxy_();
+  topic_sub_map =	shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
 
-	return rv;
+  pthread_mutex_init(&gMutex, NULL);
+
+  rv = _run_proxy_();
+
+  return rv;
 }
 
 
@@ -199,7 +204,7 @@
 	int rv;
 	struct timespec timeout = {1,0};
 
-	if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
+    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
 
 		subscripter_set = map_iter->second;
 		for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
@@ -296,8 +301,172 @@
   return dataBuf;
 }
 
+void list::Insert(int aData, int bData) 
+{
+  LinkNode *pHead = NULL; 
+  LinkNode *pNew = NULL;
+  LinkNode *pCur = NULL;
+ 
+  pNew = (LinkNode *)malloc(sizeof(LinkNode));
+  pNew->data = aData;
+  pNew->data_fix = bData;
+  pNew->count = 0;
+  
+  pHead = head;
+  pCur = pHead;
+  if(pHead == NULL) {
+    head = pNew;
+  
+    pNew->next = NULL;
+ 
+  } else {
+    while(pCur->next != NULL) {
+      pCur = pCur->next;
+    }
+    
+    pCur->next = pNew;
+    pNew->next = NULL;
+  }
+}
+
+void list::Delete(int data)  
+{
+  LinkNode *pHead;
+  LinkNode *pCur;
+  LinkNode *pNext;
+  
+  pHead = head;
+  pCur = pHead;
+  if(pHead == NULL)
+    return;
+  
+  while((pCur != NULL) && (pCur->data == data)) {
+    
+    head = pCur->next;
+    
+    free(pCur);
+    
+    pCur = head;
+    
+  }
+
+  while((pCur != NULL) && (pCur->next != NULL)) {
+    pNext = pCur->next;
+
+    if(pNext->data == data) {
+      pCur->next = pNext->next;
+      pCur = pNext->next;
+
+      free(pNext);
+    } else {
+    
+      pCur = pNext;
+      
+    }
+  }
+}
+
+void list::dataSet(int data, int val)  
+{
+  LinkNode *pCur;
+  
+  pCur = head;
+  if(pCur == NULL)
+    return;
+  
+  while(pCur != NULL) {
+    
+    if(pCur->data == data) {
+      pCur->count = val;
+    }
+    
+    pCur = pCur->next;
+  }
+}
+
+int list::dataGet(int data)  
+{
+  LinkNode *pCur;
+  
+  pCur = head;
+  if(pCur == NULL)
+    return 0;
+  
+  while(pCur != NULL) {
+    
+    if(pCur->data == data) {
+      return pCur->count;
+    }
+    
+    pCur = pCur->next;
+  }
+  
+  return 0;
+}
+
+int list::dataFixGet(int data)  
+{
+  LinkNode *pCur;
+  
+  pCur = head;
+  if(pCur == NULL)
+    return 0;
+  
+  while(pCur != NULL) {
+    
+    if(pCur->data == data) {
+      return pCur->data_fix;
+    }
+    
+    pCur = pCur->next;
+  }
+  
+  return 0;
+}
+
+int list::NodeNum(void)
+{
+  int count = 0;
+  LinkNode *pCur = head;
+  
+  if (pCur == NULL) {
+    return 0;
+  }
+  
+  while(pCur != NULL) {
+   
+    ++count;
+    pCur = pCur->next;
+  }
+  
+  return count;
+}
+
+int list::nodeGet(int index)
+{
+  int count = 0;
+  LinkNode *pCur = head;
+  
+  if (pCur == NULL) {
+    return 0;
+  }
+  
+  while((pCur != NULL) && (count <= index)) {
+   
+    if (count == index) {
+      return pCur->data;
+    }
+    
+    ++count;
+    pCur = pCur->next;
+  }
+  
+  return 0;
+}
+
 void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
 {
+  char data_buf[MAX_STR_LEN] = { 0x00 };
   char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
   int count = 0;
   int i = 0;
@@ -340,16 +509,21 @@
     
       memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); 
       count += strlen(buf + count) + 1;
-
+      
+      if (flag == PROC_REG) {
+        gLinkedList.Insert(key, atoi(Data_stru.int_info));
+      }
     }
 
     ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
     ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
     ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY);
     if (flag == PROC_REG) {
+      pthread_mutex_lock(&gMutex);
       if ((proc_iter = proc->find(key)) == proc->end()) {
         proc->insert({key, Data_stru});
       }
+      pthread_mutex_unlock(&gMutex);
 
       if ((proc_part_iter = procPart->find(key)) == procPart->end()) {
         procPart->insert({key, Data_stru.proc_id});
@@ -368,16 +542,20 @@
         SvrSub_ele->erase(key);
       }
 
+      pthread_mutex_lock(&gMutex);
       if ((proc_iter = proc->find(key)) != proc->end()) {
 
         data1 = atoi((proc_iter->second).int_info);
         data2 = atoi((proc_iter->second).svr_info);
-        BusServerSocket::_data_remove(data1, data2);
+        BusServerSocket::_data_remove(data1);
+        BusServerSocket::_data_remove(data2);
+        BusServerSocket::_data_remove(key);
         len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
         strncpy(buf_temp, (proc_iter->second).proc_id, len);
-        proc->erase(proc_iter);
+        proc->erase(key);
 
       }
+      pthread_mutex_unlock(&gMutex);
 
       if ((proc_part_iter = procPart->find(key)) != procPart->end()) {
 
@@ -389,7 +567,13 @@
         procQuePart->erase(buf_temp);
       }
 
+      pthread_mutex_lock(&gMutex);
+      BusServerSocket::buf_data_remove(key);
+      pthread_mutex_unlock(&gMutex);
+
+      find_mm_data(key);
     }
+
   } else if (flag == PROC_REG_TCS) {
     ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
     SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
@@ -406,6 +590,7 @@
     strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); 
     data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
     while(data_ptr) {
+      data_ptr = trim(data_ptr, 0);
       TcsSub_ele->insert(data_ptr);
       if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) {
         SvrSub_ele = svr_tcs_iter->second;
@@ -435,6 +620,7 @@
     strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
     data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
     while(data_ptr) {
+      data_ptr = trim(data_ptr, 0);
       ret = Qurey_object(data_ptr, &len);
       if (ret != NULL) {
     
@@ -520,7 +706,7 @@
     ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
 
     strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
-    if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
+    if ((svr_tcs_iter = SvrData->find(trim(buf_temp, 0))) != SvrData->end()) {
       SvrSub_ele = svr_tcs_iter->second;
     
       for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { 
@@ -535,11 +721,11 @@
       count = 0;
     }
 
-    memset(buf_temp, 0x00, sizeof(buf_temp));
-    sprintf(buf_temp, "%d", count);
-    shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG);
+    memset(data_buf, 0x00, sizeof(data_buf));
+    sprintf(data_buf, "%d", count);
+    shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG);
 
-  } else {
+  } else if (flag == PROC_QUE_ATCS) {
 
     int val;
     int temp = 0;
@@ -682,7 +868,48 @@
 
     shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
 
+    free(last_buf);
+  } else {
+
+    char *ptr = NULL;
+    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+
+    data1 = atoi(buf_temp);
+    ptr = strstr(buf_temp, STR_MAGIC);
+    if (ptr != NULL) {
+      data2 = atoi(ptr + 1);
+    }
+    BusServerSocket::buf_data_set(data1, data2); 
   }
+}
+
+int BusServerSocket::get_data(int val) {
+  
+  ProcZone::iterator proc_iter;
+  ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+  pthread_mutex_lock(&gMutex);
+  if ((proc_iter = proc->find(val)) != proc->end()) {
+    pthread_mutex_unlock(&gMutex);
+    return true;
+  }
+  pthread_mutex_unlock(&gMutex);
+  
+  return false;
+  
+}
+
+int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \
+                          const struct timespec *timeout, const int flag) {
+  int ret;
+
+  ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag);
+
+  return ret;
+}
+
+void BusServerSocket::remove_proc(int val) {
+  BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG);
 }
 
 // 杩愯浠g悊
@@ -690,14 +917,18 @@
 	int size;
 	int key;
   int flag;
-	char * action, *topic, *topics, *buf, *content;
+  char buf_temp[MAX_STR_LEN] = { 0x00 };
+	char *action, *topic, *topics, *buf, *content;
 	size_t head_len;
 	bus_head_t head;
+    int val;
+    ProcDataZone::iterator proc_que_iter;
+    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
 
-  int rv;
-  char send_buf[512] = { 0x00 };
+    int rv;
+    char send_buf[512] = { 0x00 };
 
-  const char *topic_delim = ",";
+    const char *topic_delim = ",";
 	while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
 		head = ShmModSocket::decode_bus_head(buf);
 		topics = buf + BUS_HEAD_SIZE;
@@ -734,7 +965,8 @@
 		} 
     else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
             || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
-            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
+            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \
+            || (strcmp(action, "bufreg") == 0)) {
       content = topics + head.topic_size;
       if (strcmp(action, "reg") == 0) {
         
@@ -756,48 +988,53 @@
         
         flag = PROC_QUE_STCS; 
 
-      } else {
+      } else if (strcmp(action, "atcsque") == 0) {
         
         flag = PROC_QUE_ATCS;
 
+      } else {
+        
+        flag = PROC_REG_BUF;
+
       }
         
+      if (flag == PROC_REG) {
+        memcpy(buf_temp, content, strlen(content) + 1);
+        
+        if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
+
+          val = proc_que_iter->second;
+          _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG);
+        }
+      }
+
       _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
 
     }
-		else if (strncmp(buf, "request", strlen("request")) == 0) {
-      sprintf(send_buf, "%4d", key);
-      strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
-      
-      rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key);
-      if(rv != 0) {
-        logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n");
-      }
-    }
     else if(strcmp(action, "stop") == 0) {
-			free(buf);
-			break;
-		} else {
-			logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
-		}
-		free(buf);
-	}
+      free(buf);
+      break;
+    } else {
+      logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
+    }
+    free(buf);
+  }
 
 
-	return rv;
+  return rv;
 }
 
-void BusServerSocket::_data_remove(int val1, int val2) {
+void BusServerSocket::_data_remove(int val) {
 
   int i;
   LockFreeQueue<shm_packet_t> *queue = NULL;
   hashtable_t *hashtable = mm_get_hashtable();
 
-  void *data_ptr1 = hashtable_get(hashtable, val1);
-  void *data_ptr2 = hashtable_get(hashtable, val2);
-  if (data_ptr1 != NULL) {
-    if (data_ptr1 != (void *)1) {
-      queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
+  void *data_ptr = hashtable_get(hashtable, val);
+
+  if (data_ptr != NULL) {
+    if (data_ptr != (void *)1) {
+      queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
       queue->close();
       for (i = 0; i < queue->size(); i++) {
         mm_free((*queue)[i].buf);
@@ -805,21 +1042,46 @@
       sleep(1);
     }
 
-    hashtable_remove(hashtable, val1);
-  }
-
-  if (data_ptr2 != NULL) {
-    if (data_ptr2 != (void *)1) {
-      queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
-      queue->close();
-      for (i = 0; i < queue->size(); i++) {
-        mm_free((*queue)[i].buf);
-      }
-      sleep(1);
-    }
-
-    hashtable_remove(hashtable, val2);
+    hashtable_remove(hashtable, val);
   }
 
 }
 
+void BusServerSocket::buf_data_set(int data, int val) {
+  recvbuf_val *val_buf;
+  recvbuf_data::iterator data_iter;
+  recvbuf_val::iterator val_iter;
+
+  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
+    val_buf = data_iter->second;
+  } else {
+    void *set_ptr = mm_malloc(sizeof(recvbuf_val));
+
+    val_buf = new(set_ptr) recvbuf_val;
+    recvBuf_data.insert({data, val_buf}); 
+  }
+
+  val_buf->insert(val);
+}
+
+void BusServerSocket::buf_data_remove(int data) {
+
+  int val;
+  recvbuf_val *val_buf;
+  recvbuf_data::iterator data_iter;
+  recvbuf_val::iterator val_iter;
+
+  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
+    
+    val_buf = data_iter->second;
+    for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) {
+      val = *val_iter;
+
+      BusServerSocket::_data_remove(val);
+    }
+
+    recvBuf_data.erase(data);
+  }
+}
+
+

--
Gitblit v1.8.0