From 73689afc09ce346f9eb00e02faf7f242e55dc7ee Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@smartai.com> Date: 星期四, 09 十二月 2021 19:33:00 +0800 Subject: [PATCH] Add the sync to fix the resource clear issue. --- src/socket/bus_server_socket.cpp | 384 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 files changed, 349 insertions(+), 35 deletions(-) diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index 80f7338..fc53b0b 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -2,10 +2,13 @@ #include "bus_server_socket.h" #include "shm_mod_socket.h" #include "shm_socket.h" +#include "msg_mgr.h" #include "bus_error.h" static Logger *logger = LoggerFactory::getLogger(); +static pthread_mutex_t gMutex; +list gLinkedList; void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); SHMKeySet *subscripter_set; @@ -82,11 +85,13 @@ int BusServerSocket::start(){ int rv; - topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); - - rv = _run_proxy_(); + topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); - return rv; + pthread_mutex_init(&gMutex, NULL); + + rv = _run_proxy_(); + + return rv; } @@ -199,7 +204,7 @@ int rv; struct timespec timeout = {1,0}; - if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { + if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { @@ -296,12 +301,177 @@ return dataBuf; } +void list::Insert(int aData, int bData) +{ + LinkNode *pHead = NULL; + LinkNode *pNew = NULL; + LinkNode *pCur = NULL; + + pNew = (LinkNode *)malloc(sizeof(LinkNode)); + pNew->data = aData; + pNew->data_fix = bData; + pNew->count = 0; + + pHead = head; + pCur = pHead; + if(pHead == NULL) { + head = pNew; + + pNew->next = NULL; + + } else { + while(pCur->next != NULL) { + pCur = pCur->next; + } + + pCur->next = pNew; + pNew->next = NULL; + } +} + +void list::Delete(int data) +{ + LinkNode *pHead; + LinkNode *pCur; + LinkNode *pNext; + + pHead = head; + pCur = pHead; + if(pHead == NULL) + return; + + while((pCur != NULL) && (pCur->data == data)) { + + head = pCur->next; + + free(pCur); + + pCur = head; + + } + + while((pCur != NULL) && (pCur->next != NULL)) { + pNext = pCur->next; + + if(pNext->data == data) { + pCur->next = pNext->next; + pCur = pNext->next; + + free(pNext); + } else { + + pCur = pNext; + + } + } +} + +void list::dataSet(int data, int val) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return; + + while(pCur != NULL) { + + if(pCur->data == data) { + pCur->count = val; + } + + pCur = pCur->next; + } +} + +int list::dataGet(int data) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return 0; + + while(pCur != NULL) { + + if(pCur->data == data) { + return pCur->count; + } + + pCur = pCur->next; + } + + return 0; +} + +int list::dataFixGet(int data) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return 0; + + while(pCur != NULL) { + + if(pCur->data == data) { + return pCur->data_fix; + } + + pCur = pCur->next; + } + + return 0; +} + +int list::NodeNum(void) +{ + int count = 0; + LinkNode *pCur = head; + + if (pCur == NULL) { + return 0; + } + + while(pCur != NULL) { + + ++count; + pCur = pCur->next; + } + + return count; +} + +int list::nodeGet(int index) +{ + int count = 0; + LinkNode *pCur = head; + + if (pCur == NULL) { + return 0; + } + + while((pCur != NULL) && (count <= index)) { + + if (count == index) { + return pCur->data; + } + + ++count; + pCur = pCur->next; + } + + return 0; +} + void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag) { + char data_buf[MAX_STR_LEN] = { 0x00 }; char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; int count = 0; int i = 0; int len = 0; + int data1, data2; char *data_ptr; ProcInfo Data_stru; ProcZone::iterator proc_iter; @@ -333,15 +503,27 @@ memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1); count += strlen(buf + count) + 1; + + memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + if (flag == PROC_REG) { + gLinkedList.Insert(key, atoi(Data_stru.int_info)); + } } ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY); if (flag == PROC_REG) { + pthread_mutex_lock(&gMutex); if ((proc_iter = proc->find(key)) == proc->end()) { proc->insert({key, Data_stru}); } + pthread_mutex_unlock(&gMutex); if ((proc_part_iter = procPart->find(key)) == procPart->end()) { procPart->insert({key, Data_stru.proc_id}); @@ -360,13 +542,20 @@ SvrSub_ele->erase(key); } + pthread_mutex_lock(&gMutex); if ((proc_iter = proc->find(key)) != proc->end()) { + data1 = atoi((proc_iter->second).int_info); + data2 = atoi((proc_iter->second).svr_info); + BusServerSocket::_data_remove(data1); + BusServerSocket::_data_remove(data2); + BusServerSocket::_data_remove(key); len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); strncpy(buf_temp, (proc_iter->second).proc_id, len); - proc->erase(proc_iter); + proc->erase(key); } + pthread_mutex_unlock(&gMutex); if ((proc_part_iter = procPart->find(key)) != procPart->end()) { @@ -378,7 +567,13 @@ procQuePart->erase(buf_temp); } + pthread_mutex_lock(&gMutex); + BusServerSocket::buf_data_remove(key); + pthread_mutex_unlock(&gMutex); + + find_mm_data(key); } + } else if (flag == PROC_REG_TCS) { ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); @@ -395,6 +590,7 @@ strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); while(data_ptr) { + data_ptr = trim(data_ptr, 0); TcsSub_ele->insert(data_ptr); if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) { SvrSub_ele = svr_tcs_iter->second; @@ -424,6 +620,7 @@ strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); while(data_ptr) { + data_ptr = trim(data_ptr, 0); ret = Qurey_object(data_ptr, &len); if (ret != NULL) { @@ -504,14 +701,19 @@ free(last_buf); } else if (flag == PROC_QUE_STCS) { + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); - if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) { + if ((svr_tcs_iter = SvrData->find(trim(buf_temp, 0))) != SvrData->end()) { SvrSub_ele = svr_tcs_iter->second; for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { count = *svr_proc_iter; + if ((proc_iter = proc->find(count)) != proc->end()) { + count = atoi((proc_iter->second).svr_info); + } break; } @@ -519,11 +721,11 @@ count = 0; } - memset(buf_temp, 0x00, sizeof(buf_temp)); - sprintf(buf_temp, "%d", count); - shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG); + memset(data_buf, 0x00, sizeof(data_buf)); + sprintf(data_buf, "%d", count); + shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG); - } else { + } else if (flag == PROC_QUE_ATCS) { int val; int temp = 0; @@ -666,7 +868,48 @@ shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); + free(last_buf); + } else { + + char *ptr = NULL; + strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); + + data1 = atoi(buf_temp); + ptr = strstr(buf_temp, STR_MAGIC); + if (ptr != NULL) { + data2 = atoi(ptr + 1); + } + BusServerSocket::buf_data_set(data1, data2); } +} + +int BusServerSocket::get_data(int val) { + + ProcZone::iterator proc_iter; + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + + pthread_mutex_lock(&gMutex); + if ((proc_iter = proc->find(val)) != proc->end()) { + pthread_mutex_unlock(&gMutex); + return true; + } + pthread_mutex_unlock(&gMutex); + + return false; + +} + +int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \ + const struct timespec *timeout, const int flag) { + int ret; + + ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag); + + return ret; +} + +void BusServerSocket::remove_proc(int val) { + BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG); } // 杩愯浠g悊 @@ -674,15 +917,18 @@ int size; int key; int flag; - char * action, *topic, *topics, *buf, *content; + char buf_temp[MAX_STR_LEN] = { 0x00 }; + char *action, *topic, *topics, *buf, *content; size_t head_len; - char resp_buf[128]; bus_head_t head; + int val; + ProcDataZone::iterator proc_que_iter; + ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); - int rv; - char send_buf[512] = { 0x00 }; + int rv; + char send_buf[512] = { 0x00 }; - const char *topic_delim = ","; + const char *topic_delim = ","; while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) { head = ShmModSocket::decode_bus_head(buf); topics = buf + BUS_HEAD_SIZE; @@ -719,7 +965,8 @@ } else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \ || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \ - || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) { + || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \ + || (strcmp(action, "bufreg") == 0)) { content = topics + head.topic_size; if (strcmp(action, "reg") == 0) { @@ -741,33 +988,100 @@ flag = PROC_QUE_STCS; - } else { + } else if (strcmp(action, "atcsque") == 0) { flag = PROC_QUE_ATCS; + } else { + + flag = PROC_REG_BUF; + } + if (flag == PROC_REG) { + memcpy(buf_temp, content, strlen(content) + 1); + + if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { + + val = proc_que_iter->second; + _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG); + } + } + _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag); } - else if (strncmp(buf, "request", strlen("request")) == 0) { - sprintf(send_buf, "%4d", key); - strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4)); - - rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key); - if(rv != 0) { - logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n"); - } - } else if(strcmp(action, "stop") == 0) { - free(buf); - break; - } else { - logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action); - } - free(buf); - } + free(buf); + break; + } else { + logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action); + } + free(buf); + } - return rv; + return rv; } + +void BusServerSocket::_data_remove(int val) { + + int i; + LockFreeQueue<shm_packet_t> *queue = NULL; + hashtable_t *hashtable = mm_get_hashtable(); + + void *data_ptr = hashtable_get(hashtable, val); + + if (data_ptr != NULL) { + if (data_ptr != (void *)1) { + queue = (LockFreeQueue<shm_packet_t> *)data_ptr; + queue->close(); + for (i = 0; i < queue->size(); i++) { + mm_free((*queue)[i].buf); + } + sleep(1); + } + + hashtable_remove(hashtable, val); + } + +} + +void BusServerSocket::buf_data_set(int data, int val) { + recvbuf_val *val_buf; + recvbuf_data::iterator data_iter; + recvbuf_val::iterator val_iter; + + if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) { + val_buf = data_iter->second; + } else { + void *set_ptr = mm_malloc(sizeof(recvbuf_val)); + + val_buf = new(set_ptr) recvbuf_val; + recvBuf_data.insert({data, val_buf}); + } + + val_buf->insert(val); +} + +void BusServerSocket::buf_data_remove(int data) { + + int val; + recvbuf_val *val_buf; + recvbuf_data::iterator data_iter; + recvbuf_val::iterator val_iter; + + if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) { + + val_buf = data_iter->second; + for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) { + val = *val_iter; + + BusServerSocket::_data_remove(val); + } + + recvBuf_data.erase(data); + } +} + + -- Gitblit v1.8.0