From 5c912c70e9333298ff48f7ea15424f72ca977b99 Mon Sep 17 00:00:00 2001 From: Fu Juntang <StrongTiger_001@163.com> Date: 星期五, 17 九月 2021 09:43:55 +0800 Subject: [PATCH] Add the heartbeat logic feature. --- src/socket/bus_server_socket.cpp | 209 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 208 insertions(+), 1 deletions(-) diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index d5e757d..cfb7419 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -6,6 +6,7 @@ static Logger *logger = LoggerFactory::getLogger(); +list gLinkedList; void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); SHMKeySet *subscripter_set; @@ -296,6 +297,169 @@ return dataBuf; } +void list::Insert(int aData, int bData) +{ + LinkNode *pHead = NULL; + LinkNode *pNew = NULL; + LinkNode *pCur = NULL; + + pNew = new(LinkNode); + pNew->data = aData; + pNew->data_fix = bData; + pNew->count = 0; + + pHead = head; + pCur = pHead; + if(pHead == NULL) { + head = pNew; + + pNew->next = NULL; + + } else { + while(pCur->next != NULL) { + pCur = pCur->next; + } + + pCur->next = pNew; + pNew->next = NULL; + } +} + +void list::Delete(int data) +{ + LinkNode *pHead; + LinkNode *pCur; + LinkNode *pNext; + + pHead = head; + pCur = pHead; + if(pHead == NULL) + return; + + while((pCur != NULL) && (pCur->data == data)) { + + head = pCur->next; + + delete(pCur); + + pCur = head; + + } + + while((pCur != NULL) && (pCur->next != NULL)) { + pNext = pCur->next; + + if(pNext->data == data) { + pCur->next = pNext->next; + pCur = pNext->next; + + delete(pNext); + } else { + + pCur = pNext; + + } + } +} + +void list::dataSet(int data, int val) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return; + + while(pCur != NULL) { + + if(pCur->data == data) { + pCur->count = val; + } + + pCur = pCur->next; + } +} + +int list::dataGet(int data) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return 0; + + while(pCur != NULL) { + + if(pCur->data == data) { + return pCur->count; + } + + pCur = pCur->next; + } + + return 0; +} + +int list::dataFixGet(int data) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return 0; + + while(pCur != NULL) { + + if(pCur->data == data) { + return pCur->data_fix; + } + + pCur = pCur->next; + } + + return 0; +} + +int list::NodeNum(void) +{ + int count = 0; + LinkNode *pCur = head; + + if (pCur == NULL) { + return 0; + } + + while(pCur != NULL) { + + ++count; + pCur = pCur->next; + } + + return count; +} + +int list::nodeGet(int index) +{ + int count = 0; + LinkNode *pCur = head; + + if (pCur == NULL) { + return 0; + } + + while((pCur != NULL) && (count <= index)) { + + if (count == index) { + return pCur->data; + } + + ++count; + pCur = pCur->next; + } + + return 0; +} + void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag) { char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; @@ -340,7 +504,10 @@ memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); count += strlen(buf + count) + 1; - + + if (flag == PROC_REG) { + gLinkedList.Insert(key, atoi(Data_stru.int_info)); + } } ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); @@ -685,14 +852,44 @@ } } +int BusServerSocket::get_data(int val) { + + ProcZone::iterator proc_iter; + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + + if ((proc_iter = proc->find(val)) != proc->end()) { + return true; + } + + return false; + +} + +int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \ + const struct timespec *timeout, const int flag) { + int ret; + + ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag); + + return ret; +} + +void BusServerSocket::remove_proc(int val) { + BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG); +} + // 杩愯浠g悊 int BusServerSocket::_run_proxy_() { int size; int key; int flag; + char buf_temp[MAX_STR_LEN] = { 0x00 }; char * action, *topic, *topics, *buf, *content; size_t head_len; bus_head_t head; + int val; + ProcDataZone::iterator proc_que_iter; + ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); int rv; char send_buf[512] = { 0x00 }; @@ -762,6 +959,16 @@ } + if (flag == PROC_REG) { + memcpy(buf_temp, content, strlen(content) + 1); + + if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { + + val = proc_que_iter->second; + _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG); + } + } + _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag); } -- Gitblit v1.8.0