From b861de29176891657cc96631ddbfb4ea9e114a42 Mon Sep 17 00:00:00 2001 From: Fu Juntang <StrongTiger_001@163.com> Date: 星期一, 30 八月 2021 17:52:23 +0800 Subject: [PATCH] re-structure the communication work flow. --- src/socket/bus_server_socket.cpp | 534 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 files changed, 506 insertions(+), 28 deletions(-) diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index 7a45696..1646da5 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -1,6 +1,7 @@ #include "bus_server_socket.h" #include "shm_mod_socket.h" +#include "shm_socket.h" #include "bus_error.h" static Logger *logger = LoggerFactory::getLogger(); @@ -12,10 +13,10 @@ SHMTopicSubMap::iterator map_iter; if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; if(subscripter_set != NULL) { - for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { + for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { cb(subscripter_set, *set_iter); } } @@ -35,7 +36,7 @@ SHMTopicSubMap::iterator map_iter; if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) { subscripter_set->erase(set_iter); @@ -50,7 +51,6 @@ BusServerSocket::BusServerSocket() { - logger->debug("BusServerSocket Init"); shm_socket = shm_socket_open(SHM_SOCKET_DGRAM); topic_sub_map = NULL; @@ -80,10 +80,13 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ int BusServerSocket::start(){ + int rv; + topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); - _run_proxy_(); - return 0; + rv = _run_proxy_(); + + return rv; } @@ -114,7 +117,7 @@ SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; if(subscripter_set != NULL) { subscripter_set->clear(); @@ -126,8 +129,8 @@ shm_mm_free_by_key(SHM_BUS_MAP_KEY); } shm_socket_close(shm_socket); - logger->debug("BusServerSocket destory 3"); - return 0; + + return 0; } /* @@ -135,10 +138,10 @@ */ void BusServerSocket::_proxy_sub( char *topic, int key) { SHMKeySet *subscripter_set; - + + struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0}; SHMTopicSubMap::iterator map_iter; SHMKeySet::iterator set_iter; -//printf("_proxy_sub topic = %s\n", topic); if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; } else { @@ -147,6 +150,7 @@ topic_sub_map->insert({topic, subscripter_set}); } subscripter_set->insert(key); + } /* @@ -173,7 +177,7 @@ SHMTopicSubMap::iterator map_iter; // SHMKeySet::iterator set_iter; - for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; subscripter_set->erase(key); } @@ -182,7 +186,7 @@ /* * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙� */ -void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) { +void BusServerSocket::_proxy_pub( char *topic, char *buf, size_t size, int key) { SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; @@ -198,7 +202,7 @@ if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; - for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { + for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { send_key = *set_iter; rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG); if(rv == 0) { @@ -209,7 +213,7 @@ } // 鍒犻櫎宸插叧闂殑绔� - for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) { + for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); ++vector_iter) { if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) { subscripter_set->erase(set_iter); logger->debug("remove closed subscripter %d \n", send_key); @@ -218,13 +222,458 @@ subscripter_to_del.clear(); } + } +ProcInfo_query *Qurey_object(const char *object, int *length) { + int flag = 0; + int val; + int len; + int total = 0; + ProcInfo *Proc_ptr = NULL; + ProcInfo Data_stru; + ProcInfo_query *dataBuf = NULL; + SvrProc *SvrSub_ele; + SvrTcs::iterator svr_tcs_iter; + SvrProc::iterator svr_proc_iter; + ProcZone::iterator proc_iter; + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + + if ((svr_tcs_iter = SvrData->find(object)) != SvrData->end()) { + SvrSub_ele = svr_tcs_iter->second; + for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { + val = *svr_proc_iter; + + if ((proc_iter = proc->find(val)) != proc->end()) { + + if (dataBuf == NULL) { + dataBuf = (ProcInfo_query *)malloc(sizeof(ProcInfo_query)); + if (dataBuf == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + total = sizeof(ProcInfo_query); + } + + if (flag == 0) { + memset(dataBuf, 0x00, sizeof(ProcInfo_query)); + + dataBuf->num = 1; + strncpy(dataBuf->name, object, sizeof(dataBuf->name) - 1); + + flag = 1; + + } else { + dataBuf->num++; + len = sizeof(ProcInfo_query) + sizeof(ProcInfo) * (dataBuf->num - 1); + dataBuf = (ProcInfo_query *)realloc(dataBuf, len); + if (dataBuf == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + total += sizeof(ProcInfo); + memset((char *)dataBuf + len - sizeof(ProcInfo), 0x00, sizeof(ProcInfo)); + } + + memset(&Data_stru, 0x00, sizeof(ProcInfo)); + Data_stru = proc_iter->second; + + Proc_ptr = &(dataBuf->procData) + dataBuf->num - 1; + strncpy(Proc_ptr->proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id) + 1); + strncpy(Proc_ptr->name, Data_stru.name, strlen(Data_stru.name) + 1); + strncpy(Proc_ptr->public_info, Data_stru.public_info, strlen(Data_stru.public_info) + 1); + strncpy(Proc_ptr->private_info, Data_stru.private_info, strlen(Data_stru.private_info) + 1); + + if (length != NULL) + *length = total; + } + } + } + + return dataBuf; +} + +void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag) +{ + char buf_temp[MAX_STR_LEN] = { 0x00 }; + int count = 0; + int i = 0; + int len = 0; + char *data_ptr; + ProcInfo Data_stru; + ProcZone::iterator proc_iter; + TcsZone *TcsSub_ele; + ProcDataZone::iterator proc_que_iter; + ProcTcsMap::iterator proc_tcs_iter; + SvrProc *SvrSub_ele; + SvrProc::iterator svr_proc_iter; + SvrTcs::iterator svr_tcs_iter; + TcsZone::iterator tcssub_iter; + ProcPartZone::iterator proc_part_iter; + + struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0}; + + if ((flag == PROC_REG) || (flag == PROC_UNREG)) { + + memset(&Data_stru, 0x00, sizeof(ProcInfo)); + + if (buf != NULL) { + + memcpy(Data_stru.proc_id, buf, strlen(buf) + 1); + count = strlen(buf) + 1; + + memcpy(Data_stru.name, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + memcpy(Data_stru.public_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + } + + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); + ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY); + if (flag == PROC_REG) { + if ((proc_iter = proc->find(key)) == proc->end()) { + proc->insert({key, Data_stru}); + } + + if ((proc_part_iter = procPart->find(key)) == procPart->end()) { + procPart->insert({key, Data_stru.proc_id}); + } + + if ((proc_que_iter = procQuePart->find(Data_stru.proc_id)) == procQuePart->end()) { + procQuePart->insert({Data_stru.proc_id, key}); + } + + } else { + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + + for (svr_tcs_iter = SvrData->begin(); svr_tcs_iter != SvrData->end(); ++svr_tcs_iter) { + SvrSub_ele = svr_tcs_iter->second; + + SvrSub_ele->erase(key); + } + + if ((proc_iter = proc->find(key)) != proc->end()) { + + len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); + strncpy(buf_temp, (proc_iter->second).proc_id, len); + proc->erase(proc_iter); + + } + + if ((proc_part_iter = procPart->find(key)) != procPart->end()) { + + procPart->erase(key); + } + + if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { + + procQuePart->erase(buf_temp); + } + + } + } else if (flag == PROC_REG_TCS) { + ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + + if ((proc_tcs_iter = proc->find(key)) != proc->end()) { + TcsSub_ele = proc_tcs_iter->second; + } else { + + void *ptr_set = mm_malloc(sizeof(TcsZone)); + TcsSub_ele = new(ptr_set) TcsZone; + proc->insert({key, TcsSub_ele}); + } + + strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); + data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); + while(data_ptr) { + TcsSub_ele->insert(data_ptr); + if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) { + SvrSub_ele = svr_tcs_iter->second; + } else { + + void *ptr_set = mm_malloc(sizeof(SvrProc)); + SvrSub_ele = new(ptr_set) SvrProc; + SvrData->insert({data_ptr, SvrSub_ele}); + } + SvrSub_ele->insert(key); + data_ptr = strtok(NULL, STR_MAGIC); + } + + } else if (flag == PROC_QUE_TCS) { + + struct _temp_store { + void *ptr; + int total; + } *temp_store = NULL; + + int num = 0; + int sum = 0; + + ProcInfo_query *ret = NULL; + ProcInfo_query *ret_store = NULL; + + strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); + data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); + while(data_ptr) { + ret = Qurey_object(data_ptr, &len); + if (ret != NULL) { + + if (temp_store == NULL) { + temp_store = (_temp_store *)malloc(sizeof(_temp_store)); + if (temp_store == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + temp_store->ptr = ret; + temp_store->total = len; + num = 1; + + } else { + num++; + temp_store = (_temp_store *)realloc(temp_store, sizeof(_temp_store) * num); + if (temp_store == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + (temp_store + num - 1)->ptr = ret; + (temp_store + num - 1)->total = len; + } + + } + data_ptr = strtok(NULL, STR_MAGIC); + } + + if (num > 0) { + for (count = 0; count < num; count++) { + + if (ret_store == NULL) { + ret_store = (ProcInfo_query *)malloc((temp_store + count)->total); + if (ret_store == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + sum = (temp_store + count)->total; + memcpy(ret_store, (temp_store + count)->ptr, (temp_store +count)->total); + + } else { + + ret_store = (ProcInfo_query *)realloc(ret_store, sum + (temp_store + count)->total); + if (ret_store == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + memcpy((char *)ret_store + sum, (temp_store + count)->ptr, (temp_store + count)->total); + + sum += (temp_store + count)->total; + + } + + free((temp_store + count)->ptr); + + } + + free(temp_store); + } + + void *last_buf = malloc(sum + sizeof(int)); + if (last_buf == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + *(int *)last_buf = num; + if (num > 0) { + memcpy((char *)last_buf + sizeof(int), (char *)ret_store, sum); + free(ret_store); + } + + shm_sendto(shm_socket, last_buf, sum + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); + + free(last_buf); + } else if (flag == PROC_QUE_STCS) { + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + + strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); + if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) { + SvrSub_ele = svr_tcs_iter->second; + + for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { + count = *svr_proc_iter; + + break; + } + } else { + count = 0; + } + + memset(buf_temp, 0x00, sizeof(buf_temp)); + sprintf(buf_temp, "%d", count); + shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG); + + } else { + + int val; + int temp = 0; + int pos = 0; + int size = 0; + ProcInfo_sum *Data_sum = NULL; + SHMKeySet *subs_proc; + SHMKeySet::iterator subs_proc_iter; + SHMTopicSubMap::iterator subs_iter; + + ProcTcsMap *procData = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + + for (proc_iter = proc->begin(); proc_iter != proc->end(); ++proc_iter) { + + memset(&Data_stru, 0x00, sizeof(Data_stru)); + + if (count == 0) { + Data_sum = (ProcInfo_sum *)malloc(sizeof(ProcInfo_sum)); + if (Data_sum == NULL) { + + logger->error("in proxy_reg: Out of memory!\n"); + + exit(1); + } + + count++; + + memset(Data_sum, 0x00, sizeof(ProcInfo_sum)); + + } else { + + count++; + len = sizeof(ProcInfo_sum) * count; + Data_sum = (ProcInfo_sum *)realloc(Data_sum, len); + if (Data_sum == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + + exit(1); + } + + memset(Data_sum + count - 1, 0x00, sizeof(ProcInfo_sum)); + } + + Data_stru = proc_iter->second; + + memcpy((Data_sum + count - 1)->procData.proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id)); + memcpy((Data_sum + count - 1)->procData.name, Data_stru.name, strlen(Data_stru.name)); + memcpy((Data_sum + count - 1)->procData.public_info, Data_stru.public_info, strlen(Data_stru.public_info)); + memcpy((Data_sum + count - 1)->procData.private_info, Data_stru.private_info, strlen(Data_stru.private_info)); + + (Data_sum + count - 1)->stat = 1; + (Data_sum + count - 1)->list_num = 3; + + val = proc_iter->first; + if ((proc_tcs_iter = procData->find(val)) != procData->end()) { + TcsSub_ele = proc_tcs_iter->second; + + temp = 0; + pos = 0; + len = sizeof(Data_sum->reg_info) - 1; + for (tcssub_iter = TcsSub_ele->begin(); tcssub_iter != TcsSub_ele->end(); ++tcssub_iter) { + + if (temp == 0) { + strncpy((Data_sum + count - 1)->reg_info, (*tcssub_iter).c_str(), strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str())); + pos += strlen((Data_sum + count - 1)->reg_info); + len -= strlen((Data_sum + count - 1)->reg_info); + + temp++; + } else { + + if (len > 0) { + strcat((Data_sum + count - 1)->reg_info, ","); + + pos += 1; + len -= 1; + } + + if (len > 0) { + size = strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str()); + strncpy(&(Data_sum + count - 1)->reg_info[pos], (*tcssub_iter).c_str(), size); + + pos += size; + len -= size; + } + } + } + + pos = 0; + temp = 0; + len = sizeof(Data_sum->local_info) - 1; + for (subs_iter = topic_sub_map->begin(); subs_iter != topic_sub_map->end(); ++subs_iter) { + subs_proc = subs_iter->second; + + if ((subs_proc_iter = subs_proc->find(val)) != subs_proc->end()) { + + if ((temp == 0)) { + + strncpy((Data_sum + count - 1)->local_info, subs_iter->first.c_str(), strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str())); + pos += strlen((Data_sum + count - 1)->local_info); + len -= strlen((Data_sum + count - 1)->local_info); + + temp++; + } else { + + if (len > 0) { + strcat((Data_sum + count - 1)->local_info, ","); + + pos += 1; + len -= 1; + } + + if (len > 0) { + size = strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str()); + strncpy(&(Data_sum + count - 1)->local_info[pos], subs_iter->first.c_str(), size); + + pos += size; + len -= size; + } + } + + } + } + + } + } + + temp = count * sizeof(ProcInfo_sum); + void *last_buf = malloc(temp + sizeof(int)); + if (last_buf == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + *(int *)last_buf = count; + if (count > 0) { + memcpy((char *)last_buf + sizeof(int), (char *)Data_sum, temp); + free(Data_sum); + } + + shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); + + } +} // 杩愯浠g悊 -void * BusServerSocket::_run_proxy_() { +int BusServerSocket::_run_proxy_() { int size; int key; + int flag; char * action, *topic, *topics, *buf, *content; size_t head_len; char resp_buf[128]; @@ -233,25 +682,21 @@ int rv; char send_buf[512] = { 0x00 }; - const char *topic_delim = ","; - - while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { + const char *topic_delim = ","; + while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) { head = ShmModSocket::decode_bus_head(buf); topics = buf + BUS_HEAD_SIZE; action = head.action; - if(strcmp(action, "sub") == 0) { // 璁㈤槄鏀寔澶氫富棰樿闃� topic = strtok(topics, topic_delim); while(topic) { - _proxy_sub(trim(topic, 0), key); topic = strtok(NULL, topic_delim); } } else if(strcmp(action, "desub") == 0) { - if(strcmp(trim(topics, 0), "") == 0) { // 鍙栨秷鎵�鏈夎闃� _proxy_desub_all(key); @@ -259,7 +704,6 @@ topic = strtok(topics, topic_delim); while(topic) { - _proxy_desub(trim(topic, 0), key); topic = strtok(NULL, topic_delim); } @@ -267,10 +711,45 @@ } else if(strcmp(action, "pub") == 0) { - content = topics + head.topic_size; - _proxy_pub(topics, content, head.content_size, key); + topics[head.topic_size - 1] = '\0'; + content = topics + head.topic_size; - } + _proxy_pub(topics, topics, head.topic_size + head.content_size, key); + + } + else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \ + || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \ + || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) { + content = topics + head.topic_size; + if (strcmp(action, "reg") == 0) { + + flag = PROC_REG; + + } else if (strcmp(action, "unreg") == 0) { + + flag = PROC_UNREG; + + } else if (strcmp(action, "tcsreg") == 0) { + + flag = PROC_REG_TCS; + + } else if (strcmp(action, "tcsque") == 0) { + + flag = PROC_QUE_TCS; + + } else if (strcmp(action, "stcsque") == 0) { + + flag = PROC_QUE_STCS; + + } else { + + flag = PROC_QUE_ATCS; + + } + + _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag); + + } else if (strncmp(buf, "request", strlen("request")) == 0) { sprintf(send_buf, "%4d", key); strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4)); @@ -281,7 +760,6 @@ } } else if(strcmp(action, "stop") == 0) { - logger->info( "Stopping Bus..."); free(buf); break; } else { @@ -291,5 +769,5 @@ } - return NULL; + return rv; } -- Gitblit v1.8.0