From de80ed87b8339f23624642786698057a62bdf779 Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@smartai.com> Date: 星期二, 23 十一月 2021 11:25:48 +0800 Subject: [PATCH] Fix the communication failure issue when the registered applications exceeds the fixed amount. --- src/socket/bus_server_socket.cpp | 95 +++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 82 insertions(+), 13 deletions(-) diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index 315c356..ef1321e 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -2,9 +2,11 @@ #include "bus_server_socket.h" #include "shm_mod_socket.h" #include "shm_socket.h" +#include "msg_mgr.h" #include "bus_error.h" static Logger *logger = LoggerFactory::getLogger(); +static pthread_mutex_t gMutex; list gLinkedList; void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { @@ -83,11 +85,13 @@ int BusServerSocket::start(){ int rv; - topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); - - rv = _run_proxy_(); + topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); - return rv; + pthread_mutex_init(&gMutex, NULL); + + rv = _run_proxy_(); + + return rv; } @@ -303,7 +307,7 @@ LinkNode *pNew = NULL; LinkNode *pCur = NULL; - pNew = new(LinkNode); + pNew = (LinkNode *)malloc(sizeof(LinkNode)); pNew->data = aData; pNew->data_fix = bData; pNew->count = 0; @@ -340,7 +344,7 @@ head = pCur->next; - delete(pCur); + free(pCur); pCur = head; @@ -353,7 +357,7 @@ pCur->next = pNext->next; pCur = pNext->next; - delete(pNext); + free(pNext); } else { pCur = pNext; @@ -515,9 +519,11 @@ ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY); if (flag == PROC_REG) { + pthread_mutex_lock(&gMutex); if ((proc_iter = proc->find(key)) == proc->end()) { proc->insert({key, Data_stru}); } + pthread_mutex_unlock(&gMutex); if ((proc_part_iter = procPart->find(key)) == procPart->end()) { procPart->insert({key, Data_stru.proc_id}); @@ -536,6 +542,7 @@ SvrSub_ele->erase(key); } + pthread_mutex_lock(&gMutex); if ((proc_iter = proc->find(key)) != proc->end()) { data1 = atoi((proc_iter->second).int_info); @@ -545,9 +552,10 @@ BusServerSocket::_data_remove(key); len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); strncpy(buf_temp, (proc_iter->second).proc_id, len); - proc->erase(proc_iter); + proc->erase(key); } + pthread_mutex_unlock(&gMutex); if ((proc_part_iter = procPart->find(key)) != procPart->end()) { @@ -559,7 +567,10 @@ procQuePart->erase(buf_temp); } + BusServerSocket::buf_data_remove(key); + find_mm_data(key); } + } else if (flag == PROC_REG_TCS) { ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); @@ -709,7 +720,7 @@ sprintf(data_buf, "%d", count); shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG); - } else { + } else if (flag == PROC_QUE_ATCS) { int val; int temp = 0; @@ -853,6 +864,17 @@ shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); free(last_buf); + } else { + + char *ptr = NULL; + strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); + + data1 = atoi(buf_temp); + ptr = strstr(buf_temp, STR_MAGIC); + if (ptr != NULL) { + data2 = atoi(ptr + 1); + } + BusServerSocket::buf_data_set(data1, data2); } } @@ -861,9 +883,12 @@ ProcZone::iterator proc_iter; ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + pthread_mutex_lock(&gMutex); if ((proc_iter = proc->find(val)) != proc->end()) { + pthread_mutex_unlock(&gMutex); return true; } + pthread_mutex_unlock(&gMutex); return false; @@ -888,7 +913,7 @@ int key; int flag; char buf_temp[MAX_STR_LEN] = { 0x00 }; - char * action, *topic, *topics, *buf, *content; + char *action, *topic, *topics, *buf, *content; size_t head_len; bus_head_t head; int val; @@ -935,7 +960,8 @@ } else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \ || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \ - || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) { + || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \ + || (strcmp(action, "bufreg") == 0)) { content = topics + head.topic_size; if (strcmp(action, "reg") == 0) { @@ -957,15 +983,19 @@ flag = PROC_QUE_STCS; - } else { + } else if (strcmp(action, "atcsque") == 0) { flag = PROC_QUE_ATCS; + + } else { + + flag = PROC_REG_BUF; } if (flag == PROC_REG) { memcpy(buf_temp, content, strlen(content) + 1); - + if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { val = proc_que_iter->second; @@ -996,6 +1026,7 @@ hashtable_t *hashtable = mm_get_hashtable(); void *data_ptr = hashtable_get(hashtable, val); + if (data_ptr != NULL) { if (data_ptr != (void *)1) { queue = (LockFreeQueue<shm_packet_t> *)data_ptr; @@ -1011,3 +1042,41 @@ } +void BusServerSocket::buf_data_set(int data, int val) { + recvbuf_val *val_buf; + recvbuf_data::iterator data_iter; + recvbuf_val::iterator val_iter; + + if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) { + val_buf = data_iter->second; + } else { + void *set_ptr = mm_malloc(sizeof(recvbuf_val)); + + val_buf = new(set_ptr) recvbuf_val; + recvBuf_data.insert({data, val_buf}); + } + + val_buf->insert(val); +} + +void BusServerSocket::buf_data_remove(int data) { + + int val; + recvbuf_val *val_buf; + recvbuf_data::iterator data_iter; + recvbuf_val::iterator val_iter; + + if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) { + + val_buf = data_iter->second; + for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) { + val = *val_iter; + + BusServerSocket::_data_remove(val); + } + + recvBuf_data.erase(data); + } +} + + -- Gitblit v1.8.0