From 97428da8d339d3d0c4d00aa328d76a32c7858412 Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@smartai.com> Date: 星期五, 17 九月 2021 19:44:24 +0800 Subject: [PATCH] Fix the memory leakage. --- src/socket/bus_server_socket.cpp | 262 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 261 insertions(+), 1 deletions(-) diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index 80f7338..beb7148 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -6,6 +6,7 @@ static Logger *logger = LoggerFactory::getLogger(); +list gLinkedList; void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); SHMKeySet *subscripter_set; @@ -296,12 +297,176 @@ return dataBuf; } +void list::Insert(int aData, int bData) +{ + LinkNode *pHead = NULL; + LinkNode *pNew = NULL; + LinkNode *pCur = NULL; + + pNew = new(LinkNode); + pNew->data = aData; + pNew->data_fix = bData; + pNew->count = 0; + + pHead = head; + pCur = pHead; + if(pHead == NULL) { + head = pNew; + + pNew->next = NULL; + + } else { + while(pCur->next != NULL) { + pCur = pCur->next; + } + + pCur->next = pNew; + pNew->next = NULL; + } +} + +void list::Delete(int data) +{ + LinkNode *pHead; + LinkNode *pCur; + LinkNode *pNext; + + pHead = head; + pCur = pHead; + if(pHead == NULL) + return; + + while((pCur != NULL) && (pCur->data == data)) { + + head = pCur->next; + + delete(pCur); + + pCur = head; + + } + + while((pCur != NULL) && (pCur->next != NULL)) { + pNext = pCur->next; + + if(pNext->data == data) { + pCur->next = pNext->next; + pCur = pNext->next; + + delete(pNext); + } else { + + pCur = pNext; + + } + } +} + +void list::dataSet(int data, int val) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return; + + while(pCur != NULL) { + + if(pCur->data == data) { + pCur->count = val; + } + + pCur = pCur->next; + } +} + +int list::dataGet(int data) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return 0; + + while(pCur != NULL) { + + if(pCur->data == data) { + return pCur->count; + } + + pCur = pCur->next; + } + + return 0; +} + +int list::dataFixGet(int data) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return 0; + + while(pCur != NULL) { + + if(pCur->data == data) { + return pCur->data_fix; + } + + pCur = pCur->next; + } + + return 0; +} + +int list::NodeNum(void) +{ + int count = 0; + LinkNode *pCur = head; + + if (pCur == NULL) { + return 0; + } + + while(pCur != NULL) { + + ++count; + pCur = pCur->next; + } + + return count; +} + +int list::nodeGet(int index) +{ + int count = 0; + LinkNode *pCur = head; + + if (pCur == NULL) { + return 0; + } + + while((pCur != NULL) && (count <= index)) { + + if (count == index) { + return pCur->data; + } + + ++count; + pCur = pCur->next; + } + + return 0; +} + void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag) { char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; int count = 0; int i = 0; int len = 0; + int data1, data2; char *data_ptr; ProcInfo Data_stru; ProcZone::iterator proc_iter; @@ -333,6 +498,16 @@ memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1); count += strlen(buf + count) + 1; + + memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + if (flag == PROC_REG) { + gLinkedList.Insert(key, atoi(Data_stru.int_info)); + } } ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); @@ -362,6 +537,9 @@ if ((proc_iter = proc->find(key)) != proc->end()) { + data1 = atoi((proc_iter->second).int_info); + data2 = atoi((proc_iter->second).svr_info); + BusServerSocket::_data_remove(data1, data2); len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); strncpy(buf_temp, (proc_iter->second).proc_id, len); proc->erase(proc_iter); @@ -504,7 +682,9 @@ free(last_buf); } else if (flag == PROC_QUE_STCS) { + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) { @@ -512,6 +692,9 @@ for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { count = *svr_proc_iter; + if ((proc_iter = proc->find(count)) != proc->end()) { + count = atoi((proc_iter->second).svr_info); + } break; } @@ -666,7 +849,34 @@ shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); + free(last_buf); } +} + +int BusServerSocket::get_data(int val) { + + ProcZone::iterator proc_iter; + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + + if ((proc_iter = proc->find(val)) != proc->end()) { + return true; + } + + return false; + +} + +int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \ + const struct timespec *timeout, const int flag) { + int ret; + + ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag); + + return ret; +} + +void BusServerSocket::remove_proc(int val) { + BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG); } // 杩愯浠g悊 @@ -674,10 +884,13 @@ int size; int key; int flag; + char buf_temp[MAX_STR_LEN] = { 0x00 }; char * action, *topic, *topics, *buf, *content; size_t head_len; - char resp_buf[128]; bus_head_t head; + int val; + ProcDataZone::iterator proc_que_iter; + ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); int rv; char send_buf[512] = { 0x00 }; @@ -747,6 +960,16 @@ } + if (flag == PROC_REG) { + memcpy(buf_temp, content, strlen(content) + 1); + + if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { + + val = proc_que_iter->second; + _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG); + } + } + _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag); } @@ -771,3 +994,40 @@ return rv; } + +void BusServerSocket::_data_remove(int val1, int val2) { + + int i; + LockFreeQueue<shm_packet_t> *queue = NULL; + hashtable_t *hashtable = mm_get_hashtable(); + + void *data_ptr1 = hashtable_get(hashtable, val1); + void *data_ptr2 = hashtable_get(hashtable, val2); + if (data_ptr1 != NULL) { + if (data_ptr1 != (void *)1) { + queue = (LockFreeQueue<shm_packet_t> *)data_ptr1; + queue->close(); + for (i = 0; i < queue->size(); i++) { + mm_free((*queue)[i].buf); + } + sleep(1); + } + + hashtable_remove(hashtable, val1); + } + + if (data_ptr2 != NULL) { + if (data_ptr2 != (void *)1) { + queue = (LockFreeQueue<shm_packet_t> *)data_ptr2; + queue->close(); + for (i = 0; i < queue->size(); i++) { + mm_free((*queue)[i].buf); + } + sleep(1); + } + + hashtable_remove(hashtable, val2); + } + +} + -- Gitblit v1.8.0