From 73689afc09ce346f9eb00e02faf7f242e55dc7ee Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@smartai.com> Date: 星期四, 09 十二月 2021 19:33:00 +0800 Subject: [PATCH] Add the sync to fix the resource clear issue. --- src/socket/bus_server_socket.cpp | 29 ++++++++++++++++++++++------- 1 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index 0a44949..fc53b0b 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -6,6 +6,7 @@ #include "bus_error.h" static Logger *logger = LoggerFactory::getLogger(); +static pthread_mutex_t gMutex; list gLinkedList; void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { @@ -84,11 +85,13 @@ int BusServerSocket::start(){ int rv; - topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); - - rv = _run_proxy_(); + topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); - return rv; + pthread_mutex_init(&gMutex, NULL); + + rv = _run_proxy_(); + + return rv; } @@ -201,7 +204,7 @@ int rv; struct timespec timeout = {1,0}; - if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { + if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { @@ -516,9 +519,11 @@ ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY); if (flag == PROC_REG) { + pthread_mutex_lock(&gMutex); if ((proc_iter = proc->find(key)) == proc->end()) { proc->insert({key, Data_stru}); } + pthread_mutex_unlock(&gMutex); if ((proc_part_iter = procPart->find(key)) == procPart->end()) { procPart->insert({key, Data_stru.proc_id}); @@ -537,6 +542,7 @@ SvrSub_ele->erase(key); } + pthread_mutex_lock(&gMutex); if ((proc_iter = proc->find(key)) != proc->end()) { data1 = atoi((proc_iter->second).int_info); @@ -546,9 +552,10 @@ BusServerSocket::_data_remove(key); len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); strncpy(buf_temp, (proc_iter->second).proc_id, len); - proc->erase(proc_iter); + proc->erase(key); } + pthread_mutex_unlock(&gMutex); if ((proc_part_iter = procPart->find(key)) != procPart->end()) { @@ -560,7 +567,10 @@ procQuePart->erase(buf_temp); } + pthread_mutex_lock(&gMutex); BusServerSocket::buf_data_remove(key); + pthread_mutex_unlock(&gMutex); + find_mm_data(key); } @@ -580,6 +590,7 @@ strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); while(data_ptr) { + data_ptr = trim(data_ptr, 0); TcsSub_ele->insert(data_ptr); if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) { SvrSub_ele = svr_tcs_iter->second; @@ -609,6 +620,7 @@ strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); while(data_ptr) { + data_ptr = trim(data_ptr, 0); ret = Qurey_object(data_ptr, &len); if (ret != NULL) { @@ -694,7 +706,7 @@ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); - if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) { + if ((svr_tcs_iter = SvrData->find(trim(buf_temp, 0))) != SvrData->end()) { SvrSub_ele = svr_tcs_iter->second; for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { @@ -876,9 +888,12 @@ ProcZone::iterator proc_iter; ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + pthread_mutex_lock(&gMutex); if ((proc_iter = proc->find(val)) != proc->end()) { + pthread_mutex_unlock(&gMutex); return true; } + pthread_mutex_unlock(&gMutex); return false; -- Gitblit v1.8.0