From 5c912c70e9333298ff48f7ea15424f72ca977b99 Mon Sep 17 00:00:00 2001
From: Fu Juntang <StrongTiger_001@163.com>
Date: 星期五, 17 九月 2021 09:43:55 +0800
Subject: [PATCH] Add the heartbeat logic feature.

---
 src/socket/bus_server_socket.cpp |  209 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 208 insertions(+), 1 deletions(-)

diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index d5e757d..cfb7419 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -6,6 +6,7 @@
 
 static Logger *logger = LoggerFactory::getLogger();
 
+list gLinkedList;
 void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
 	SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
 	SHMKeySet *subscripter_set;
@@ -296,6 +297,169 @@
   return dataBuf;
 }
 
+void list::Insert(int aData, int bData) 
+{
+  LinkNode *pHead = NULL; 
+  LinkNode *pNew = NULL;
+  LinkNode *pCur = NULL;
+ 
+  pNew = new(LinkNode);
+  pNew->data = aData;
+  pNew->data_fix = bData;
+  pNew->count = 0;
+  
+  pHead = head;
+  pCur = pHead;
+  if(pHead == NULL) {
+    head = pNew;
+  
+    pNew->next = NULL;
+ 
+  } else {
+    while(pCur->next != NULL) {
+      pCur = pCur->next;
+    }
+    
+    pCur->next = pNew;
+    pNew->next = NULL;
+  }
+}
+
+void list::Delete(int data)  
+{
+  LinkNode *pHead;
+  LinkNode *pCur;
+  LinkNode *pNext;
+  
+  pHead = head;
+  pCur = pHead;
+  if(pHead == NULL)
+    return;
+  
+  while((pCur != NULL) && (pCur->data == data)) {
+    
+    head = pCur->next;
+    
+    delete(pCur);
+    
+    pCur = head;
+    
+  }
+
+  while((pCur != NULL) && (pCur->next != NULL)) {
+    pNext = pCur->next;
+
+    if(pNext->data == data) {
+      pCur->next = pNext->next;
+      pCur = pNext->next;
+
+      delete(pNext);
+    } else {
+    
+      pCur = pNext;
+      
+    }
+  }
+}
+
+void list::dataSet(int data, int val)  
+{
+  LinkNode *pCur;
+  
+  pCur = head;
+  if(pCur == NULL)
+    return;
+  
+  while(pCur != NULL) {
+    
+    if(pCur->data == data) {
+      pCur->count = val;
+    }
+    
+    pCur = pCur->next;
+  }
+}
+
+int list::dataGet(int data)  
+{
+  LinkNode *pCur;
+  
+  pCur = head;
+  if(pCur == NULL)
+    return 0;
+  
+  while(pCur != NULL) {
+    
+    if(pCur->data == data) {
+      return pCur->count;
+    }
+    
+    pCur = pCur->next;
+  }
+  
+  return 0;
+}
+
+int list::dataFixGet(int data)  
+{
+  LinkNode *pCur;
+  
+  pCur = head;
+  if(pCur == NULL)
+    return 0;
+  
+  while(pCur != NULL) {
+    
+    if(pCur->data == data) {
+      return pCur->data_fix;
+    }
+    
+    pCur = pCur->next;
+  }
+  
+  return 0;
+}
+
+int list::NodeNum(void)
+{
+  int count = 0;
+  LinkNode *pCur = head;
+  
+  if (pCur == NULL) {
+    return 0;
+  }
+  
+  while(pCur != NULL) {
+   
+    ++count;
+    pCur = pCur->next;
+  }
+  
+  return count;
+}
+
+int list::nodeGet(int index)
+{
+  int count = 0;
+  LinkNode *pCur = head;
+  
+  if (pCur == NULL) {
+    return 0;
+  }
+  
+  while((pCur != NULL) && (count <= index)) {
+   
+    if (count == index) {
+      return pCur->data;
+    }
+    
+    ++count;
+    pCur = pCur->next;
+  }
+  
+  return 0;
+}
+
 void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
 {
   char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
@@ -340,7 +504,10 @@
     
       memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); 
       count += strlen(buf + count) + 1;
-
+      
+      if (flag == PROC_REG) {
+        gLinkedList.Insert(key, atoi(Data_stru.int_info));
+      }
     }
 
     ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
@@ -685,14 +852,44 @@
   }
 }
 
+int BusServerSocket::get_data(int val) {
+  
+  ProcZone::iterator proc_iter;
+  ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+  if ((proc_iter = proc->find(val)) != proc->end()) {
+    return true;
+  }
+  
+  return false;
+  
+}
+
+int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \
+                          const struct timespec *timeout, const int flag) {
+  int ret;
+
+  ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag);
+
+  return ret;
+}
+
+void BusServerSocket::remove_proc(int val) {
+  BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG);
+}
+
 // 杩愯浠g悊
 int BusServerSocket::_run_proxy_() {
 	int size;
 	int key;
   int flag;
+  char buf_temp[MAX_STR_LEN] = { 0x00 };
 	char * action, *topic, *topics, *buf, *content;
 	size_t head_len;
 	bus_head_t head;
+    int val;
+    ProcDataZone::iterator proc_que_iter;
+    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
 
   int rv;
   char send_buf[512] = { 0x00 };
@@ -762,6 +959,16 @@
 
       }
         
+      if (flag == PROC_REG) {
+        memcpy(buf_temp, content, strlen(content) + 1);
+
+        if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
+
+          val = proc_que_iter->second;
+          _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG);
+        }
+      }
+
       _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
 
     }

--
Gitblit v1.8.0