From dc01e4cbb01e96d19b470a366bbe648d426ed171 Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@smartai.com> Date: 星期六, 11 九月 2021 10:06:15 +0800 Subject: [PATCH] Add topics sub and request support. --- src/socket/bus_server_socket.cpp | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 53 insertions(+), 0 deletions(-) diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index 8b022e1..d5e757d 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -302,6 +302,7 @@ int count = 0; int i = 0; int len = 0; + int data1, data2; char *data_ptr; ProcInfo Data_stru; ProcZone::iterator proc_iter; @@ -333,6 +334,13 @@ memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1); count += strlen(buf + count) + 1; + + memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + } ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); @@ -362,6 +370,9 @@ if ((proc_iter = proc->find(key)) != proc->end()) { + data1 = atoi((proc_iter->second).int_info); + data2 = atoi((proc_iter->second).svr_info); + BusServerSocket::_data_remove(data1, data2); len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); strncpy(buf_temp, (proc_iter->second).proc_id, len); proc->erase(proc_iter); @@ -504,7 +515,9 @@ free(last_buf); } else if (flag == PROC_QUE_STCS) { + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) { @@ -512,6 +525,9 @@ for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { count = *svr_proc_iter; + if ((proc_iter = proc->find(count)) != proc->end()) { + count = atoi((proc_iter->second).svr_info); + } break; } @@ -770,3 +786,40 @@ return rv; } + +void BusServerSocket::_data_remove(int val1, int val2) { + + int i; + LockFreeQueue<shm_packet_t> *queue = NULL; + hashtable_t *hashtable = mm_get_hashtable(); + + void *data_ptr1 = hashtable_get(hashtable, val1); + void *data_ptr2 = hashtable_get(hashtable, val2); + if (data_ptr1 != NULL) { + if (data_ptr1 != (void *)1) { + queue = (LockFreeQueue<shm_packet_t> *)data_ptr1; + queue->close(); + for (i = 0; i < queue->size(); i++) { + mm_free((*queue)[i].buf); + } + sleep(1); + } + + hashtable_remove(hashtable, val1); + } + + if (data_ptr2 != NULL) { + if (data_ptr2 != (void *)1) { + queue = (LockFreeQueue<shm_packet_t> *)data_ptr2; + queue->close(); + for (i = 0; i < queue->size(); i++) { + mm_free((*queue)[i].buf); + } + sleep(1); + } + + hashtable_remove(hashtable, val2); + } + +} + -- Gitblit v1.8.0