From 82b028cf63953d8080b63d85468eae488d212194 Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@smartai.com> Date: 星期四, 23 九月 2021 14:30:07 +0800 Subject: [PATCH] Fix the data parsing when in multiple threads. --- src/socket/bus_server_socket.cpp | 61 ++++++++++-------------------- 1 files changed, 20 insertions(+), 41 deletions(-) diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index beb7148..2d552da 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -539,7 +539,9 @@ data1 = atoi((proc_iter->second).int_info); data2 = atoi((proc_iter->second).svr_info); - BusServerSocket::_data_remove(data1, data2); + BusServerSocket::_data_remove(data1); + BusServerSocket::_data_remove(data2); + BusServerSocket::_data_remove(key); len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); strncpy(buf_temp, (proc_iter->second).proc_id, len); proc->erase(proc_iter); @@ -892,10 +894,10 @@ ProcDataZone::iterator proc_que_iter; ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); - int rv; - char send_buf[512] = { 0x00 }; + int rv; + char send_buf[512] = { 0x00 }; - const char *topic_delim = ","; + const char *topic_delim = ","; while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) { head = ShmModSocket::decode_bus_head(buf); topics = buf + BUS_HEAD_SIZE; @@ -973,39 +975,29 @@ _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag); } - else if (strncmp(buf, "request", strlen("request")) == 0) { - sprintf(send_buf, "%4d", key); - strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4)); - - rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key); - if(rv != 0) { - logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n"); - } - } else if(strcmp(action, "stop") == 0) { - free(buf); - break; - } else { - logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action); - } - free(buf); - } + free(buf); + break; + } else { + logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action); + } + free(buf); + } - return rv; + return rv; } -void BusServerSocket::_data_remove(int val1, int val2) { +void BusServerSocket::_data_remove(int val) { int i; LockFreeQueue<shm_packet_t> *queue = NULL; hashtable_t *hashtable = mm_get_hashtable(); - void *data_ptr1 = hashtable_get(hashtable, val1); - void *data_ptr2 = hashtable_get(hashtable, val2); - if (data_ptr1 != NULL) { - if (data_ptr1 != (void *)1) { - queue = (LockFreeQueue<shm_packet_t> *)data_ptr1; + void *data_ptr = hashtable_get(hashtable, val); + if (data_ptr != NULL) { + if (data_ptr != (void *)1) { + queue = (LockFreeQueue<shm_packet_t> *)data_ptr; queue->close(); for (i = 0; i < queue->size(); i++) { mm_free((*queue)[i].buf); @@ -1013,20 +1005,7 @@ sleep(1); } - hashtable_remove(hashtable, val1); - } - - if (data_ptr2 != NULL) { - if (data_ptr2 != (void *)1) { - queue = (LockFreeQueue<shm_packet_t> *)data_ptr2; - queue->close(); - for (i = 0; i < queue->size(); i++) { - mm_free((*queue)[i].buf); - } - sleep(1); - } - - hashtable_remove(hashtable, val2); + hashtable_remove(hashtable, val); } } -- Gitblit v1.8.0