From 73689afc09ce346f9eb00e02faf7f242e55dc7ee Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@smartai.com> Date: 星期四, 09 十二月 2021 19:33:00 +0800 Subject: [PATCH] Add the sync to fix the resource clear issue. --- src/socket/bus_server_socket.cpp | 1042 ++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 files changed, 879 insertions(+), 163 deletions(-) diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index 9bd61b0..fc53b0b 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -1,55 +1,48 @@ #include "bus_server_socket.h" #include "shm_mod_socket.h" +#include "shm_socket.h" +#include "msg_mgr.h" #include "bus_error.h" static Logger *logger = LoggerFactory::getLogger(); +static pthread_mutex_t gMutex; +list gLinkedList; void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { - SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); + SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); SHMKeySet *subscripter_set; SHMKeySet::iterator set_iter; SHMTopicSubMap::iterator map_iter; if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; if(subscripter_set != NULL) { - for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { + for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { cb(subscripter_set, *set_iter); } } } } } - -// bool BusServerSocket::include_in_keys(int key, int keys[], size_t length) { -// if(length == 0) { -// return false; -// } -// for(int i = 0; i < length; i++) { -// if(keys[i] == key) -// return true; -// } -// return false; -// } + size_t BusServerSocket::remove_subscripters(int keys[], size_t length) { size_t count = 0; int key; for(int i = 0; i < length; i++) { key = keys[i]; - SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); + SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); SHMKeySet *subscripter_set; SHMKeySet::iterator set_iter; SHMTopicSubMap::iterator map_iter; if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) { subscripter_set->erase(set_iter); -// printf("remove_subscripter %s, %d\n", map_iter->first, key); count++; } } @@ -61,34 +54,13 @@ BusServerSocket::BusServerSocket() { - logger->debug("BusServerSocket Init"); - shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); + shm_socket = shm_socket_open(SHM_SOCKET_DGRAM); topic_sub_map = NULL; } BusServerSocket::~BusServerSocket() { - SHMKeySet *subscripter_set; - SHMTopicSubMap::iterator map_iter; - - logger->debug("BusServerSocket destory 1"); - stop(); - logger->debug("BusServerSocket destory 2"); - - if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { - subscripter_set = map_iter->second; - if(subscripter_set != NULL) { - subscripter_set->clear(); - mm_free((void *)subscripter_set); - } - - } - topic_sub_map->clear(); - mem_pool_free_by_key(BUS_MAP_KEY); - } - shm_close_socket(shm_socket); - logger->debug("BusServerSocket destory 3"); + destroy(); } @@ -109,33 +81,35 @@ * 鍚姩bus * * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ + */ int BusServerSocket::start(){ - topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); - - run_pubsub_proxy(); - // 杩涚▼鍋滄鐨勬椂鍊欙紝棰勭暀3绉掕祫婧愬洖鏀剁殑鏃堕棿銆傚惁鍒欙紝浼氬彂鐢熻皟鐢╟lose鐨勬椂鍊欙紝鍏变韩鍐呭瓨鐨勮祫婧愯繕娌℃潵寰楀強鍥炴敹杩涚▼灏遍��鍑轰簡 - return 0; + int rv; + + topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); + + pthread_mutex_init(&gMutex, NULL); + + rv = _run_proxy_(); + + return rv; } int BusServerSocket::stop(){ int ret; - logger->debug("====>stopping"); if( shm_socket->key <= 0) { return -1; } - // snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER); - // return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0); bus_head_t head = {}; memcpy(head.action, "stop", sizeof(head.action)); head.topic_size = 0; head.content_size = 0; + ShmModSocket client; void *buf; int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL, 0, &buf); if(size > 0) { - ret = shm_sendandrecv_unsafe(shm_socket, buf, size, shm_socket->key, NULL, NULL); + ret = client.sendto( buf, size, shm_socket->key); free(buf); return ret; } else { @@ -144,15 +118,35 @@ } +int BusServerSocket::destroy() { + SHMKeySet *subscripter_set; + SHMTopicSubMap::iterator map_iter; + if(topic_sub_map != NULL) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { + subscripter_set = map_iter->second; + if(subscripter_set != NULL) { + subscripter_set->clear(); + mm_free((void *)subscripter_set); + } + + } + topic_sub_map->clear(); + shm_mm_free_by_key(SHM_BUS_MAP_KEY); + } + shm_socket_close(shm_socket); + + return 0; +} + /* * 澶勭悊璁㈤槄 */ void BusServerSocket::_proxy_sub( char *topic, int key) { SHMKeySet *subscripter_set; - + + struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0}; SHMTopicSubMap::iterator map_iter; SHMKeySet::iterator set_iter; -//printf("_proxy_sub topic = %s\n", topic); if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; } else { @@ -161,6 +155,7 @@ topic_sub_map->insert({topic, subscripter_set}); } subscripter_set->insert(key); + } /* @@ -187,7 +182,7 @@ SHMTopicSubMap::iterator map_iter; // SHMKeySet::iterator set_iter; - for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; subscripter_set->erase(key); } @@ -196,7 +191,7 @@ /* * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙� */ -void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) { +void BusServerSocket::_proxy_pub( char *topic, char *buf, size_t size, int key) { SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; @@ -206,25 +201,24 @@ std::vector<int>::iterator vector_iter; int send_key; + int rv; struct timespec timeout = {1,0}; - if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { - subscripter_set = map_iter->second; - for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { - send_key = *set_iter; - // printf("_proxy_pub send before %d \n", send_key); - if (shm_sendto(shm_socket, buf, size, send_key, &timeout) == EBUS_CLOSED ) { - //瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡 - subscripter_to_del.push_back(send_key); - } else { -// printf("_proxy_pub send after: %d \n", send_key); - } + if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { - + subscripter_set = map_iter->second; + for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { + send_key = *set_iter; + rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG); + if(rv == 0) { + continue; + } + //瀵规柟宸插叧闂殑鎴栬�呭搴旂殑杩涚▼琚玨ill鎺夌殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡 + subscripter_to_del.push_back(send_key); } // 鍒犻櫎宸插叧闂殑绔� - for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) { + for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); ++vector_iter) { if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) { subscripter_set->erase(set_iter); logger->debug("remove closed subscripter %d \n", send_key); @@ -233,29 +227,715 @@ subscripter_to_del.clear(); } + } -void * BusServerSocket::run_pubsub_proxy() { - // pthread_detach(pthread_self()); +ProcInfo_query *Qurey_object(const char *object, int *length) { + int flag = 0; + int val; + int len; + int total = 0; + ProcInfo *Proc_ptr = NULL; + ProcInfo Data_stru; + ProcInfo_query *dataBuf = NULL; + SvrProc *SvrSub_ele; + SvrTcs::iterator svr_tcs_iter; + SvrProc::iterator svr_proc_iter; + ProcZone::iterator proc_iter; + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + + if ((svr_tcs_iter = SvrData->find(object)) != SvrData->end()) { + SvrSub_ele = svr_tcs_iter->second; + for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { + val = *svr_proc_iter; + + if ((proc_iter = proc->find(val)) != proc->end()) { + + if (dataBuf == NULL) { + dataBuf = (ProcInfo_query *)malloc(sizeof(ProcInfo_query)); + if (dataBuf == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + total = sizeof(ProcInfo_query); + } + + if (flag == 0) { + memset(dataBuf, 0x00, sizeof(ProcInfo_query)); + + dataBuf->num = 1; + strncpy(dataBuf->name, object, sizeof(dataBuf->name) - 1); + + flag = 1; + + } else { + dataBuf->num++; + len = sizeof(ProcInfo_query) + sizeof(ProcInfo) * (dataBuf->num - 1); + dataBuf = (ProcInfo_query *)realloc(dataBuf, len); + if (dataBuf == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + total += sizeof(ProcInfo); + memset((char *)dataBuf + len - sizeof(ProcInfo), 0x00, sizeof(ProcInfo)); + } + + memset(&Data_stru, 0x00, sizeof(ProcInfo)); + Data_stru = proc_iter->second; + + Proc_ptr = &(dataBuf->procData) + dataBuf->num - 1; + strncpy(Proc_ptr->proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id) + 1); + strncpy(Proc_ptr->name, Data_stru.name, strlen(Data_stru.name) + 1); + strncpy(Proc_ptr->public_info, Data_stru.public_info, strlen(Data_stru.public_info) + 1); + strncpy(Proc_ptr->private_info, Data_stru.private_info, strlen(Data_stru.private_info) + 1); + + if (length != NULL) + *length = total; + } + } + } + + return dataBuf; +} + +void list::Insert(int aData, int bData) +{ + LinkNode *pHead = NULL; + LinkNode *pNew = NULL; + LinkNode *pCur = NULL; + + pNew = (LinkNode *)malloc(sizeof(LinkNode)); + pNew->data = aData; + pNew->data_fix = bData; + pNew->count = 0; + + pHead = head; + pCur = pHead; + if(pHead == NULL) { + head = pNew; + + pNew->next = NULL; + + } else { + while(pCur->next != NULL) { + pCur = pCur->next; + } + + pCur->next = pNew; + pNew->next = NULL; + } +} + +void list::Delete(int data) +{ + LinkNode *pHead; + LinkNode *pCur; + LinkNode *pNext; + + pHead = head; + pCur = pHead; + if(pHead == NULL) + return; + + while((pCur != NULL) && (pCur->data == data)) { + + head = pCur->next; + + free(pCur); + + pCur = head; + + } + + while((pCur != NULL) && (pCur->next != NULL)) { + pNext = pCur->next; + + if(pNext->data == data) { + pCur->next = pNext->next; + pCur = pNext->next; + + free(pNext); + } else { + + pCur = pNext; + + } + } +} + +void list::dataSet(int data, int val) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return; + + while(pCur != NULL) { + + if(pCur->data == data) { + pCur->count = val; + } + + pCur = pCur->next; + } +} + +int list::dataGet(int data) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return 0; + + while(pCur != NULL) { + + if(pCur->data == data) { + return pCur->count; + } + + pCur = pCur->next; + } + + return 0; +} + +int list::dataFixGet(int data) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return 0; + + while(pCur != NULL) { + + if(pCur->data == data) { + return pCur->data_fix; + } + + pCur = pCur->next; + } + + return 0; +} + +int list::NodeNum(void) +{ + int count = 0; + LinkNode *pCur = head; + + if (pCur == NULL) { + return 0; + } + + while(pCur != NULL) { + + ++count; + pCur = pCur->next; + } + + return count; +} + +int list::nodeGet(int index) +{ + int count = 0; + LinkNode *pCur = head; + + if (pCur == NULL) { + return 0; + } + + while((pCur != NULL) && (count <= index)) { + + if (count == index) { + return pCur->data; + } + + ++count; + pCur = pCur->next; + } + + return 0; +} + +void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag) +{ + char data_buf[MAX_STR_LEN] = { 0x00 }; + char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; + int count = 0; + int i = 0; + int len = 0; + int data1, data2; + char *data_ptr; + ProcInfo Data_stru; + ProcZone::iterator proc_iter; + TcsZone *TcsSub_ele; + ProcDataZone::iterator proc_que_iter; + ProcTcsMap::iterator proc_tcs_iter; + SvrProc *SvrSub_ele; + SvrProc::iterator svr_proc_iter; + SvrTcs::iterator svr_tcs_iter; + TcsZone::iterator tcssub_iter; + ProcPartZone::iterator proc_part_iter; + + struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0}; + + if ((flag == PROC_REG) || (flag == PROC_UNREG)) { + + memset(&Data_stru, 0x00, sizeof(ProcInfo)); + + if (buf != NULL) { + + memcpy(Data_stru.proc_id, buf, strlen(buf) + 1); + count = strlen(buf) + 1; + + memcpy(Data_stru.name, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + memcpy(Data_stru.public_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + if (flag == PROC_REG) { + gLinkedList.Insert(key, atoi(Data_stru.int_info)); + } + } + + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); + ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY); + if (flag == PROC_REG) { + pthread_mutex_lock(&gMutex); + if ((proc_iter = proc->find(key)) == proc->end()) { + proc->insert({key, Data_stru}); + } + pthread_mutex_unlock(&gMutex); + + if ((proc_part_iter = procPart->find(key)) == procPart->end()) { + procPart->insert({key, Data_stru.proc_id}); + } + + if ((proc_que_iter = procQuePart->find(Data_stru.proc_id)) == procQuePart->end()) { + procQuePart->insert({Data_stru.proc_id, key}); + } + + } else { + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + + for (svr_tcs_iter = SvrData->begin(); svr_tcs_iter != SvrData->end(); ++svr_tcs_iter) { + SvrSub_ele = svr_tcs_iter->second; + + SvrSub_ele->erase(key); + } + + pthread_mutex_lock(&gMutex); + if ((proc_iter = proc->find(key)) != proc->end()) { + + data1 = atoi((proc_iter->second).int_info); + data2 = atoi((proc_iter->second).svr_info); + BusServerSocket::_data_remove(data1); + BusServerSocket::_data_remove(data2); + BusServerSocket::_data_remove(key); + len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); + strncpy(buf_temp, (proc_iter->second).proc_id, len); + proc->erase(key); + + } + pthread_mutex_unlock(&gMutex); + + if ((proc_part_iter = procPart->find(key)) != procPart->end()) { + + procPart->erase(key); + } + + if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { + + procQuePart->erase(buf_temp); + } + + pthread_mutex_lock(&gMutex); + BusServerSocket::buf_data_remove(key); + pthread_mutex_unlock(&gMutex); + + find_mm_data(key); + } + + } else if (flag == PROC_REG_TCS) { + ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + + if ((proc_tcs_iter = proc->find(key)) != proc->end()) { + TcsSub_ele = proc_tcs_iter->second; + } else { + + void *ptr_set = mm_malloc(sizeof(TcsZone)); + TcsSub_ele = new(ptr_set) TcsZone; + proc->insert({key, TcsSub_ele}); + } + + strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); + data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); + while(data_ptr) { + data_ptr = trim(data_ptr, 0); + TcsSub_ele->insert(data_ptr); + if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) { + SvrSub_ele = svr_tcs_iter->second; + } else { + + void *ptr_set = mm_malloc(sizeof(SvrProc)); + SvrSub_ele = new(ptr_set) SvrProc; + SvrData->insert({data_ptr, SvrSub_ele}); + } + SvrSub_ele->insert(key); + data_ptr = strtok(NULL, STR_MAGIC); + } + + } else if (flag == PROC_QUE_TCS) { + + struct _temp_store { + void *ptr; + int total; + } *temp_store = NULL; + + int num = 0; + int sum = 0; + + ProcInfo_query *ret = NULL; + ProcInfo_query *ret_store = NULL; + + strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); + data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); + while(data_ptr) { + data_ptr = trim(data_ptr, 0); + ret = Qurey_object(data_ptr, &len); + if (ret != NULL) { + + if (temp_store == NULL) { + temp_store = (_temp_store *)malloc(sizeof(_temp_store)); + if (temp_store == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + temp_store->ptr = ret; + temp_store->total = len; + num = 1; + + } else { + num++; + temp_store = (_temp_store *)realloc(temp_store, sizeof(_temp_store) * num); + if (temp_store == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + (temp_store + num - 1)->ptr = ret; + (temp_store + num - 1)->total = len; + } + + } + data_ptr = strtok(NULL, STR_MAGIC); + } + + if (num > 0) { + for (count = 0; count < num; count++) { + + if (ret_store == NULL) { + ret_store = (ProcInfo_query *)malloc((temp_store + count)->total); + if (ret_store == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + sum = (temp_store + count)->total; + memcpy(ret_store, (temp_store + count)->ptr, (temp_store +count)->total); + + } else { + + ret_store = (ProcInfo_query *)realloc(ret_store, sum + (temp_store + count)->total); + if (ret_store == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + memcpy((char *)ret_store + sum, (temp_store + count)->ptr, (temp_store + count)->total); + + sum += (temp_store + count)->total; + + } + + free((temp_store + count)->ptr); + + } + + free(temp_store); + } + + void *last_buf = malloc(sum + sizeof(int)); + if (last_buf == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + *(int *)last_buf = num; + if (num > 0) { + memcpy((char *)last_buf + sizeof(int), (char *)ret_store, sum); + free(ret_store); + } + + shm_sendto(shm_socket, last_buf, sum + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); + + free(last_buf); + } else if (flag == PROC_QUE_STCS) { + + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + + strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); + if ((svr_tcs_iter = SvrData->find(trim(buf_temp, 0))) != SvrData->end()) { + SvrSub_ele = svr_tcs_iter->second; + + for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { + count = *svr_proc_iter; + if ((proc_iter = proc->find(count)) != proc->end()) { + count = atoi((proc_iter->second).svr_info); + } + + break; + } + } else { + count = 0; + } + + memset(data_buf, 0x00, sizeof(data_buf)); + sprintf(data_buf, "%d", count); + shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG); + + } else if (flag == PROC_QUE_ATCS) { + + int val; + int temp = 0; + int pos = 0; + int size = 0; + ProcInfo_sum *Data_sum = NULL; + SHMKeySet *subs_proc; + SHMKeySet::iterator subs_proc_iter; + SHMTopicSubMap::iterator subs_iter; + + ProcTcsMap *procData = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + + for (proc_iter = proc->begin(); proc_iter != proc->end(); ++proc_iter) { + + memset(&Data_stru, 0x00, sizeof(Data_stru)); + + if (count == 0) { + Data_sum = (ProcInfo_sum *)malloc(sizeof(ProcInfo_sum)); + if (Data_sum == NULL) { + + logger->error("in proxy_reg: Out of memory!\n"); + + exit(1); + } + + count++; + + memset(Data_sum, 0x00, sizeof(ProcInfo_sum)); + + } else { + + count++; + len = sizeof(ProcInfo_sum) * count; + Data_sum = (ProcInfo_sum *)realloc(Data_sum, len); + if (Data_sum == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + + exit(1); + } + + memset(Data_sum + count - 1, 0x00, sizeof(ProcInfo_sum)); + } + + Data_stru = proc_iter->second; + + memcpy((Data_sum + count - 1)->procData.proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id)); + memcpy((Data_sum + count - 1)->procData.name, Data_stru.name, strlen(Data_stru.name)); + memcpy((Data_sum + count - 1)->procData.public_info, Data_stru.public_info, strlen(Data_stru.public_info)); + memcpy((Data_sum + count - 1)->procData.private_info, Data_stru.private_info, strlen(Data_stru.private_info)); + + (Data_sum + count - 1)->stat = 1; + (Data_sum + count - 1)->list_num = 3; + + val = proc_iter->first; + if ((proc_tcs_iter = procData->find(val)) != procData->end()) { + TcsSub_ele = proc_tcs_iter->second; + + temp = 0; + pos = 0; + len = sizeof(Data_sum->reg_info) - 1; + for (tcssub_iter = TcsSub_ele->begin(); tcssub_iter != TcsSub_ele->end(); ++tcssub_iter) { + + if (temp == 0) { + strncpy((Data_sum + count - 1)->reg_info, (*tcssub_iter).c_str(), strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str())); + pos += strlen((Data_sum + count - 1)->reg_info); + len -= strlen((Data_sum + count - 1)->reg_info); + + temp++; + } else { + + if (len > 0) { + strcat((Data_sum + count - 1)->reg_info, ","); + + pos += 1; + len -= 1; + } + + if (len > 0) { + size = strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str()); + strncpy(&(Data_sum + count - 1)->reg_info[pos], (*tcssub_iter).c_str(), size); + + pos += size; + len -= size; + } + } + } + + pos = 0; + temp = 0; + len = sizeof(Data_sum->local_info) - 1; + for (subs_iter = topic_sub_map->begin(); subs_iter != topic_sub_map->end(); ++subs_iter) { + subs_proc = subs_iter->second; + + if ((subs_proc_iter = subs_proc->find(val)) != subs_proc->end()) { + + if ((temp == 0)) { + + strncpy((Data_sum + count - 1)->local_info, subs_iter->first.c_str(), strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str())); + pos += strlen((Data_sum + count - 1)->local_info); + len -= strlen((Data_sum + count - 1)->local_info); + + temp++; + } else { + + if (len > 0) { + strcat((Data_sum + count - 1)->local_info, ","); + + pos += 1; + len -= 1; + } + + if (len > 0) { + size = strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str()); + strncpy(&(Data_sum + count - 1)->local_info[pos], subs_iter->first.c_str(), size); + + pos += size; + len -= size; + } + } + + } + } + + } + } + + temp = count * sizeof(ProcInfo_sum); + void *last_buf = malloc(temp + sizeof(int)); + if (last_buf == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + *(int *)last_buf = count; + if (count > 0) { + memcpy((char *)last_buf + sizeof(int), (char *)Data_sum, temp); + free(Data_sum); + } + + shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); + + free(last_buf); + } else { + + char *ptr = NULL; + strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); + + data1 = atoi(buf_temp); + ptr = strstr(buf_temp, STR_MAGIC); + if (ptr != NULL) { + data2 = atoi(ptr + 1); + } + BusServerSocket::buf_data_set(data1, data2); + } +} + +int BusServerSocket::get_data(int val) { + + ProcZone::iterator proc_iter; + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + + pthread_mutex_lock(&gMutex); + if ((proc_iter = proc->find(val)) != proc->end()) { + pthread_mutex_unlock(&gMutex); + return true; + } + pthread_mutex_unlock(&gMutex); + + return false; + +} + +int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \ + const struct timespec *timeout, const int flag) { + int ret; + + ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag); + + return ret; +} + +void BusServerSocket::remove_proc(int val) { + BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG); +} + +// 杩愯浠g悊 +int BusServerSocket::_run_proxy_() { int size; int key; - char * action, *topic, *topics, *buf, *content; + int flag; + char buf_temp[MAX_STR_LEN] = { 0x00 }; + char *action, *topic, *topics, *buf, *content; size_t head_len; - char resp_buf[128]; bus_head_t head; + int val; + ProcDataZone::iterator proc_que_iter; + ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); - const char *topic_delim = ","; -// printf("run_pubsub_proxy server receive before\n"); - while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { -//printf("run_pubsub_proxy server recv after: %s \n", buf); + int rv; + char send_buf[512] = { 0x00 }; + + const char *topic_delim = ","; + while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) { head = ShmModSocket::decode_bus_head(buf); topics = buf + BUS_HEAD_SIZE; action = head.action; - // printf("run_pubsub_proxy : %s, %s \n", action, topics); if(strcmp(action, "sub") == 0) { // 璁㈤槄鏀寔澶氫富棰樿闃� topic = strtok(topics, topic_delim); -//printf("run_pubsub_proxy topic = %s\n", topic); while(topic) { _proxy_sub(trim(topic, 0), key); topic = strtok(NULL, topic_delim); @@ -263,7 +943,6 @@ } else if(strcmp(action, "desub") == 0) { -// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); if(strcmp(trim(topics, 0), "") == 0) { // 鍙栨秷鎵�鏈夎闃� _proxy_desub_all(key); @@ -278,94 +957,131 @@ } else if(strcmp(action, "pub") == 0) { - content = topics + head.topic_size; - _proxy_pub(topics, content, head.content_size, key); + topics[head.topic_size - 1] = '\0'; + content = topics + head.topic_size; - } - else if(strcmp(action, "stop") == 0) { - - free(buf); - break; - } else { - logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action %s", action); - } - - free(buf); - } + _proxy_pub(topics, topics, head.topic_size + head.content_size, key); - logger->info( "Stopping Bus..."); - shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key); + } + else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \ + || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \ + || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \ + || (strcmp(action, "bufreg") == 0)) { + content = topics + head.topic_size; + if (strcmp(action, "reg") == 0) { + + flag = PROC_REG; - return NULL; + } else if (strcmp(action, "unreg") == 0) { + + flag = PROC_UNREG; + + } else if (strcmp(action, "tcsreg") == 0) { + + flag = PROC_REG_TCS; + + } else if (strcmp(action, "tcsque") == 0) { + + flag = PROC_QUE_TCS; + + } else if (strcmp(action, "stcsque") == 0) { + + flag = PROC_QUE_STCS; + + } else if (strcmp(action, "atcsque") == 0) { + + flag = PROC_QUE_ATCS; + + } else { + + flag = PROC_REG_BUF; + + } + + if (flag == PROC_REG) { + memcpy(buf_temp, content, strlen(content) + 1); + + if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { + + val = proc_que_iter->second; + _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG); + } + } + + _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag); + + } + else if(strcmp(action, "stop") == 0) { + free(buf); + break; + } else { + logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action); + } + free(buf); + } + + + return rv; +} + +void BusServerSocket::_data_remove(int val) { + + int i; + LockFreeQueue<shm_packet_t> *queue = NULL; + hashtable_t *hashtable = mm_get_hashtable(); + + void *data_ptr = hashtable_get(hashtable, val); + + if (data_ptr != NULL) { + if (data_ptr != (void *)1) { + queue = (LockFreeQueue<shm_packet_t> *)data_ptr; + queue->close(); + for (i = 0; i < queue->size(); i++) { + mm_free((*queue)[i].buf); + } + sleep(1); + } + + hashtable_remove(hashtable, val); + } + +} + +void BusServerSocket::buf_data_set(int data, int val) { + recvbuf_val *val_buf; + recvbuf_data::iterator data_iter; + recvbuf_val::iterator val_iter; + + if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) { + val_buf = data_iter->second; + } else { + void *set_ptr = mm_malloc(sizeof(recvbuf_val)); + + val_buf = new(set_ptr) recvbuf_val; + recvBuf_data.insert({data, val_buf}); + } + + val_buf->insert(val); +} + +void BusServerSocket::buf_data_remove(int data) { + + int val; + recvbuf_val *val_buf; + recvbuf_data::iterator data_iter; + recvbuf_val::iterator val_iter; + + if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) { + + val_buf = data_iter->second; + for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) { + val = *val_iter; + + BusServerSocket::_data_remove(val); + } + + recvBuf_data.erase(data); + } } - -/** - * deprecate - * @str "<**sub**>{缁忔祹}" - */ - -int BusServerSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) { - char *ptr = str; - char *str_end_ptr = str + size; - char *action_start_ptr; - char *action_end_ptr; - size_t action_len = 0; - - char *topic_start_ptr; - char *topic_end_ptr; - size_t topic_len = 0; - - // if (strlen(identifier) > strlen(str)) { - // return 0; - // } - - if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) { - ptr += strlen(ACTION_LIDENTIFIER); - action_start_ptr = ptr; - while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) { - if(ptr >= str_end_ptr) { - return 0; - } - } -// printf("%s\n", ptr); - action_end_ptr = ptr; - action_len = action_end_ptr - action_start_ptr; - ptr += strlen(ACTION_RIDENTIFIER); -// printf("%s\n", ptr); -// printf("%s\n", str_end_ptr-1); - if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) { - topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER); - - - while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) { - if(ptr >= str_end_ptr) { - return 0; - } - } - topic_end_ptr = ptr; - topic_len = topic_end_ptr - topic_start_ptr; - - ptr += strlen(TOPIC_RIDENTIFIER); - - } else { - return 0; - } - } else { - return 0; - } - - char *topic = (char *)malloc(topic_len+1); - strncpy(topic, topic_start_ptr, topic_len); - *(topic+topic_len) = '\0'; - *_topic = topic; - - char *action = (char *)malloc(action_len+1); - strncpy(action, action_start_ptr, action_len); - *(action+action_len) = '\0'; - *_action = action; - *head_len = ptr-str; - - return 1; -} \ No newline at end of file -- Gitblit v1.8.0