| | |
| | | |
| | | #include "bus_server_socket.h" |
| | | #include "shm_mod_socket.h" |
| | | #include "shm_socket.h" |
| | | #include "msg_mgr.h" |
| | | #include "bus_error.h" |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | static pthread_mutex_t gMutex; |
| | | |
| | | list gLinkedList; |
| | | void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { |
| | | SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | SHMKeySet *subscripter_set; |
| | |
| | | SHMTopicSubMap::iterator map_iter; |
| | | |
| | | if(topic_sub_map != NULL) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { |
| | | subscripter_set = map_iter->second; |
| | | if(subscripter_set != NULL) { |
| | | for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { |
| | | for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { |
| | | cb(subscripter_set, *set_iter); |
| | | } |
| | | } |
| | |
| | | SHMTopicSubMap::iterator map_iter; |
| | | |
| | | if(topic_sub_map != NULL) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { |
| | | subscripter_set = map_iter->second; |
| | | if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) { |
| | | subscripter_set->erase(set_iter); |
| | | // printf("remove_subscripter %s, %d\n", map_iter->first, key); |
| | | count++; |
| | | } |
| | | } |
| | |
| | | |
| | | |
| | | BusServerSocket::BusServerSocket() { |
| | | logger->debug("BusServerSocket Init"); |
| | | shm_socket = shm_socket_open(SHM_SOCKET_DGRAM); |
| | | topic_sub_map = NULL; |
| | | |
| | |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int BusServerSocket::start(){ |
| | | topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | |
| | | _run_proxy_(); |
| | | return 0; |
| | | int rv; |
| | | |
| | | topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | |
| | | pthread_mutex_init(&gMutex, NULL); |
| | | |
| | | rv = _run_proxy_(); |
| | | |
| | | return rv; |
| | | } |
| | | |
| | | |
| | |
| | | SHMKeySet *subscripter_set; |
| | | SHMTopicSubMap::iterator map_iter; |
| | | if(topic_sub_map != NULL) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { |
| | | subscripter_set = map_iter->second; |
| | | if(subscripter_set != NULL) { |
| | | subscripter_set->clear(); |
| | |
| | | shm_mm_free_by_key(SHM_BUS_MAP_KEY); |
| | | } |
| | | shm_socket_close(shm_socket); |
| | | logger->debug("BusServerSocket destory 3"); |
| | | return 0; |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | /* |
| | |
| | | */ |
| | | void BusServerSocket::_proxy_sub( char *topic, int key) { |
| | | SHMKeySet *subscripter_set; |
| | | |
| | | |
| | | struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0}; |
| | | SHMTopicSubMap::iterator map_iter; |
| | | SHMKeySet::iterator set_iter; |
| | | //printf("_proxy_sub topic = %s\n", topic); |
| | | if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { |
| | | subscripter_set = map_iter->second; |
| | | } else { |
| | |
| | | topic_sub_map->insert({topic, subscripter_set}); |
| | | } |
| | | subscripter_set->insert(key); |
| | | |
| | | } |
| | | |
| | | /* |
| | |
| | | |
| | | SHMTopicSubMap::iterator map_iter; |
| | | // SHMKeySet::iterator set_iter; |
| | | for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { |
| | | subscripter_set = map_iter->second; |
| | | subscripter_set->erase(key); |
| | | } |
| | |
| | | /* |
| | | * 处理发布,代理转发 |
| | | */ |
| | | void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) { |
| | | void BusServerSocket::_proxy_pub( char *topic, char *buf, size_t size, int key) { |
| | | SHMKeySet *subscripter_set; |
| | | |
| | | SHMTopicSubMap::iterator map_iter; |
| | |
| | | int rv; |
| | | struct timespec timeout = {1,0}; |
| | | |
| | | if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { |
| | | if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { |
| | | |
| | | subscripter_set = map_iter->second; |
| | | for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { |
| | | for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { |
| | | send_key = *set_iter; |
| | | // logger->debug("_proxy_pub send before %d \n", send_key); |
| | | rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG); |
| | | if(rv == 0) { |
| | | continue; |
| | |
| | | } |
| | | |
| | | // 删除已关闭的端 |
| | | for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) { |
| | | for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); ++vector_iter) { |
| | | if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) { |
| | | subscripter_set->erase(set_iter); |
| | | logger->debug("remove closed subscripter %d \n", send_key); |
| | |
| | | subscripter_to_del.clear(); |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | ProcInfo_query *Qurey_object(const char *object, int *length) { |
| | | int flag = 0; |
| | | int val; |
| | | int len; |
| | | int total = 0; |
| | | ProcInfo *Proc_ptr = NULL; |
| | | ProcInfo Data_stru; |
| | | ProcInfo_query *dataBuf = NULL; |
| | | SvrProc *SvrSub_ele; |
| | | SvrTcs::iterator svr_tcs_iter; |
| | | SvrProc::iterator svr_proc_iter; |
| | | ProcZone::iterator proc_iter; |
| | | SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); |
| | | ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); |
| | | |
| | | if ((svr_tcs_iter = SvrData->find(object)) != SvrData->end()) { |
| | | SvrSub_ele = svr_tcs_iter->second; |
| | | for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { |
| | | val = *svr_proc_iter; |
| | | |
| | | if ((proc_iter = proc->find(val)) != proc->end()) { |
| | | |
| | | if (dataBuf == NULL) { |
| | | dataBuf = (ProcInfo_query *)malloc(sizeof(ProcInfo_query)); |
| | | if (dataBuf == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | total = sizeof(ProcInfo_query); |
| | | } |
| | | |
| | | if (flag == 0) { |
| | | memset(dataBuf, 0x00, sizeof(ProcInfo_query)); |
| | | |
| | | dataBuf->num = 1; |
| | | strncpy(dataBuf->name, object, sizeof(dataBuf->name) - 1); |
| | | |
| | | flag = 1; |
| | | |
| | | } else { |
| | | dataBuf->num++; |
| | | len = sizeof(ProcInfo_query) + sizeof(ProcInfo) * (dataBuf->num - 1); |
| | | dataBuf = (ProcInfo_query *)realloc(dataBuf, len); |
| | | if (dataBuf == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | total += sizeof(ProcInfo); |
| | | memset((char *)dataBuf + len - sizeof(ProcInfo), 0x00, sizeof(ProcInfo)); |
| | | } |
| | | |
| | | memset(&Data_stru, 0x00, sizeof(ProcInfo)); |
| | | Data_stru = proc_iter->second; |
| | | |
| | | Proc_ptr = &(dataBuf->procData) + dataBuf->num - 1; |
| | | strncpy(Proc_ptr->proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id) + 1); |
| | | strncpy(Proc_ptr->name, Data_stru.name, strlen(Data_stru.name) + 1); |
| | | strncpy(Proc_ptr->public_info, Data_stru.public_info, strlen(Data_stru.public_info) + 1); |
| | | strncpy(Proc_ptr->private_info, Data_stru.private_info, strlen(Data_stru.private_info) + 1); |
| | | |
| | | if (length != NULL) |
| | | *length = total; |
| | | } |
| | | } |
| | | } |
| | | |
| | | return dataBuf; |
| | | } |
| | | |
| | | void list::Insert(int aData, int bData) |
| | | { |
| | | LinkNode *pHead = NULL; |
| | | LinkNode *pNew = NULL; |
| | | LinkNode *pCur = NULL; |
| | | |
| | | pNew = (LinkNode *)malloc(sizeof(LinkNode)); |
| | | pNew->data = aData; |
| | | pNew->data_fix = bData; |
| | | pNew->count = 0; |
| | | |
| | | pHead = head; |
| | | pCur = pHead; |
| | | if(pHead == NULL) { |
| | | head = pNew; |
| | | |
| | | pNew->next = NULL; |
| | | |
| | | } else { |
| | | while(pCur->next != NULL) { |
| | | pCur = pCur->next; |
| | | } |
| | | |
| | | pCur->next = pNew; |
| | | pNew->next = NULL; |
| | | } |
| | | } |
| | | |
| | | void list::Delete(int data) |
| | | { |
| | | LinkNode *pHead; |
| | | LinkNode *pCur; |
| | | LinkNode *pNext; |
| | | |
| | | pHead = head; |
| | | pCur = pHead; |
| | | if(pHead == NULL) |
| | | return; |
| | | |
| | | while((pCur != NULL) && (pCur->data == data)) { |
| | | |
| | | head = pCur->next; |
| | | |
| | | free(pCur); |
| | | |
| | | pCur = head; |
| | | |
| | | } |
| | | |
| | | while((pCur != NULL) && (pCur->next != NULL)) { |
| | | pNext = pCur->next; |
| | | |
| | | if(pNext->data == data) { |
| | | pCur->next = pNext->next; |
| | | pCur = pNext->next; |
| | | |
| | | free(pNext); |
| | | } else { |
| | | |
| | | pCur = pNext; |
| | | |
| | | } |
| | | } |
| | | } |
| | | |
| | | void list::dataSet(int data, int val) |
| | | { |
| | | LinkNode *pCur; |
| | | |
| | | pCur = head; |
| | | if(pCur == NULL) |
| | | return; |
| | | |
| | | while(pCur != NULL) { |
| | | |
| | | if(pCur->data == data) { |
| | | pCur->count = val; |
| | | } |
| | | |
| | | pCur = pCur->next; |
| | | } |
| | | } |
| | | |
| | | int list::dataGet(int data) |
| | | { |
| | | LinkNode *pCur; |
| | | |
| | | pCur = head; |
| | | if(pCur == NULL) |
| | | return 0; |
| | | |
| | | while(pCur != NULL) { |
| | | |
| | | if(pCur->data == data) { |
| | | return pCur->count; |
| | | } |
| | | |
| | | pCur = pCur->next; |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | int list::dataFixGet(int data) |
| | | { |
| | | LinkNode *pCur; |
| | | |
| | | pCur = head; |
| | | if(pCur == NULL) |
| | | return 0; |
| | | |
| | | while(pCur != NULL) { |
| | | |
| | | if(pCur->data == data) { |
| | | return pCur->data_fix; |
| | | } |
| | | |
| | | pCur = pCur->next; |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | int list::NodeNum(void) |
| | | { |
| | | int count = 0; |
| | | LinkNode *pCur = head; |
| | | |
| | | if (pCur == NULL) { |
| | | return 0; |
| | | } |
| | | |
| | | while(pCur != NULL) { |
| | | |
| | | ++count; |
| | | pCur = pCur->next; |
| | | } |
| | | |
| | | return count; |
| | | } |
| | | |
| | | int list::nodeGet(int index) |
| | | { |
| | | int count = 0; |
| | | LinkNode *pCur = head; |
| | | |
| | | if (pCur == NULL) { |
| | | return 0; |
| | | } |
| | | |
| | | while((pCur != NULL) && (count <= index)) { |
| | | |
| | | if (count == index) { |
| | | return pCur->data; |
| | | } |
| | | |
| | | ++count; |
| | | pCur = pCur->next; |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag) |
| | | { |
| | | char data_buf[MAX_STR_LEN] = { 0x00 }; |
| | | char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; |
| | | int count = 0; |
| | | int i = 0; |
| | | int len = 0; |
| | | int data1, data2; |
| | | char *data_ptr; |
| | | ProcInfo Data_stru; |
| | | ProcZone::iterator proc_iter; |
| | | TcsZone *TcsSub_ele; |
| | | ProcDataZone::iterator proc_que_iter; |
| | | ProcTcsMap::iterator proc_tcs_iter; |
| | | SvrProc *SvrSub_ele; |
| | | SvrProc::iterator svr_proc_iter; |
| | | SvrTcs::iterator svr_tcs_iter; |
| | | TcsZone::iterator tcssub_iter; |
| | | ProcPartZone::iterator proc_part_iter; |
| | | |
| | | struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0}; |
| | | |
| | | if ((flag == PROC_REG) || (flag == PROC_UNREG)) { |
| | | |
| | | memset(&Data_stru, 0x00, sizeof(ProcInfo)); |
| | | |
| | | if (buf != NULL) { |
| | | |
| | | memcpy(Data_stru.proc_id, buf, strlen(buf) + 1); |
| | | count = strlen(buf) + 1; |
| | | |
| | | memcpy(Data_stru.name, buf + count, strlen(buf + count) + 1); |
| | | count += strlen(buf + count) + 1; |
| | | |
| | | memcpy(Data_stru.public_info, buf + count, strlen(buf + count) + 1); |
| | | count += strlen(buf + count) + 1; |
| | | |
| | | memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1); |
| | | count += strlen(buf + count) + 1; |
| | | |
| | | memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1); |
| | | count += strlen(buf + count) + 1; |
| | | |
| | | memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); |
| | | count += strlen(buf + count) + 1; |
| | | |
| | | if (flag == PROC_REG) { |
| | | gLinkedList.Insert(key, atoi(Data_stru.int_info)); |
| | | } |
| | | } |
| | | |
| | | ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); |
| | | ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); |
| | | ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY); |
| | | if (flag == PROC_REG) { |
| | | pthread_mutex_lock(&gMutex); |
| | | if ((proc_iter = proc->find(key)) == proc->end()) { |
| | | proc->insert({key, Data_stru}); |
| | | } |
| | | pthread_mutex_unlock(&gMutex); |
| | | |
| | | if ((proc_part_iter = procPart->find(key)) == procPart->end()) { |
| | | procPart->insert({key, Data_stru.proc_id}); |
| | | } |
| | | |
| | | if ((proc_que_iter = procQuePart->find(Data_stru.proc_id)) == procQuePart->end()) { |
| | | procQuePart->insert({Data_stru.proc_id, key}); |
| | | } |
| | | |
| | | } else { |
| | | SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); |
| | | |
| | | for (svr_tcs_iter = SvrData->begin(); svr_tcs_iter != SvrData->end(); ++svr_tcs_iter) { |
| | | SvrSub_ele = svr_tcs_iter->second; |
| | | |
| | | SvrSub_ele->erase(key); |
| | | } |
| | | |
| | | pthread_mutex_lock(&gMutex); |
| | | if ((proc_iter = proc->find(key)) != proc->end()) { |
| | | |
| | | data1 = atoi((proc_iter->second).int_info); |
| | | data2 = atoi((proc_iter->second).svr_info); |
| | | BusServerSocket::_data_remove(data1); |
| | | BusServerSocket::_data_remove(data2); |
| | | BusServerSocket::_data_remove(key); |
| | | len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); |
| | | strncpy(buf_temp, (proc_iter->second).proc_id, len); |
| | | proc->erase(key); |
| | | |
| | | } |
| | | pthread_mutex_unlock(&gMutex); |
| | | |
| | | if ((proc_part_iter = procPart->find(key)) != procPart->end()) { |
| | | |
| | | procPart->erase(key); |
| | | } |
| | | |
| | | if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { |
| | | |
| | | procQuePart->erase(buf_temp); |
| | | } |
| | | |
| | | pthread_mutex_lock(&gMutex); |
| | | BusServerSocket::buf_data_remove(key); |
| | | pthread_mutex_unlock(&gMutex); |
| | | |
| | | find_mm_data(key); |
| | | } |
| | | |
| | | } else if (flag == PROC_REG_TCS) { |
| | | ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); |
| | | SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); |
| | | |
| | | if ((proc_tcs_iter = proc->find(key)) != proc->end()) { |
| | | TcsSub_ele = proc_tcs_iter->second; |
| | | } else { |
| | | |
| | | void *ptr_set = mm_malloc(sizeof(TcsZone)); |
| | | TcsSub_ele = new(ptr_set) TcsZone; |
| | | proc->insert({key, TcsSub_ele}); |
| | | } |
| | | |
| | | strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); |
| | | data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); |
| | | while(data_ptr) { |
| | | data_ptr = trim(data_ptr, 0); |
| | | TcsSub_ele->insert(data_ptr); |
| | | if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) { |
| | | SvrSub_ele = svr_tcs_iter->second; |
| | | } else { |
| | | |
| | | void *ptr_set = mm_malloc(sizeof(SvrProc)); |
| | | SvrSub_ele = new(ptr_set) SvrProc; |
| | | SvrData->insert({data_ptr, SvrSub_ele}); |
| | | } |
| | | SvrSub_ele->insert(key); |
| | | data_ptr = strtok(NULL, STR_MAGIC); |
| | | } |
| | | |
| | | } else if (flag == PROC_QUE_TCS) { |
| | | |
| | | struct _temp_store { |
| | | void *ptr; |
| | | int total; |
| | | } *temp_store = NULL; |
| | | |
| | | int num = 0; |
| | | int sum = 0; |
| | | |
| | | ProcInfo_query *ret = NULL; |
| | | ProcInfo_query *ret_store = NULL; |
| | | |
| | | strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); |
| | | data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); |
| | | while(data_ptr) { |
| | | data_ptr = trim(data_ptr, 0); |
| | | ret = Qurey_object(data_ptr, &len); |
| | | if (ret != NULL) { |
| | | |
| | | if (temp_store == NULL) { |
| | | temp_store = (_temp_store *)malloc(sizeof(_temp_store)); |
| | | if (temp_store == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | temp_store->ptr = ret; |
| | | temp_store->total = len; |
| | | num = 1; |
| | | |
| | | } else { |
| | | num++; |
| | | temp_store = (_temp_store *)realloc(temp_store, sizeof(_temp_store) * num); |
| | | if (temp_store == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | (temp_store + num - 1)->ptr = ret; |
| | | (temp_store + num - 1)->total = len; |
| | | } |
| | | |
| | | } |
| | | data_ptr = strtok(NULL, STR_MAGIC); |
| | | } |
| | | |
| | | if (num > 0) { |
| | | for (count = 0; count < num; count++) { |
| | | |
| | | if (ret_store == NULL) { |
| | | ret_store = (ProcInfo_query *)malloc((temp_store + count)->total); |
| | | if (ret_store == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | sum = (temp_store + count)->total; |
| | | memcpy(ret_store, (temp_store + count)->ptr, (temp_store +count)->total); |
| | | |
| | | } else { |
| | | |
| | | ret_store = (ProcInfo_query *)realloc(ret_store, sum + (temp_store + count)->total); |
| | | if (ret_store == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | memcpy((char *)ret_store + sum, (temp_store + count)->ptr, (temp_store + count)->total); |
| | | |
| | | sum += (temp_store + count)->total; |
| | | |
| | | } |
| | | |
| | | free((temp_store + count)->ptr); |
| | | |
| | | } |
| | | |
| | | free(temp_store); |
| | | } |
| | | |
| | | void *last_buf = malloc(sum + sizeof(int)); |
| | | if (last_buf == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | *(int *)last_buf = num; |
| | | if (num > 0) { |
| | | memcpy((char *)last_buf + sizeof(int), (char *)ret_store, sum); |
| | | free(ret_store); |
| | | } |
| | | |
| | | shm_sendto(shm_socket, last_buf, sum + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); |
| | | |
| | | free(last_buf); |
| | | } else if (flag == PROC_QUE_STCS) { |
| | | |
| | | SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); |
| | | ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); |
| | | |
| | | strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); |
| | | if ((svr_tcs_iter = SvrData->find(trim(buf_temp, 0))) != SvrData->end()) { |
| | | SvrSub_ele = svr_tcs_iter->second; |
| | | |
| | | for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { |
| | | count = *svr_proc_iter; |
| | | if ((proc_iter = proc->find(count)) != proc->end()) { |
| | | count = atoi((proc_iter->second).svr_info); |
| | | } |
| | | |
| | | break; |
| | | } |
| | | } else { |
| | | count = 0; |
| | | } |
| | | |
| | | memset(data_buf, 0x00, sizeof(data_buf)); |
| | | sprintf(data_buf, "%d", count); |
| | | shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG); |
| | | |
| | | } else if (flag == PROC_QUE_ATCS) { |
| | | |
| | | int val; |
| | | int temp = 0; |
| | | int pos = 0; |
| | | int size = 0; |
| | | ProcInfo_sum *Data_sum = NULL; |
| | | SHMKeySet *subs_proc; |
| | | SHMKeySet::iterator subs_proc_iter; |
| | | SHMTopicSubMap::iterator subs_iter; |
| | | |
| | | ProcTcsMap *procData = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); |
| | | ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); |
| | | |
| | | for (proc_iter = proc->begin(); proc_iter != proc->end(); ++proc_iter) { |
| | | |
| | | memset(&Data_stru, 0x00, sizeof(Data_stru)); |
| | | |
| | | if (count == 0) { |
| | | Data_sum = (ProcInfo_sum *)malloc(sizeof(ProcInfo_sum)); |
| | | if (Data_sum == NULL) { |
| | | |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | |
| | | exit(1); |
| | | } |
| | | |
| | | count++; |
| | | |
| | | memset(Data_sum, 0x00, sizeof(ProcInfo_sum)); |
| | | |
| | | } else { |
| | | |
| | | count++; |
| | | len = sizeof(ProcInfo_sum) * count; |
| | | Data_sum = (ProcInfo_sum *)realloc(Data_sum, len); |
| | | if (Data_sum == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | |
| | | exit(1); |
| | | } |
| | | |
| | | memset(Data_sum + count - 1, 0x00, sizeof(ProcInfo_sum)); |
| | | } |
| | | |
| | | Data_stru = proc_iter->second; |
| | | |
| | | memcpy((Data_sum + count - 1)->procData.proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id)); |
| | | memcpy((Data_sum + count - 1)->procData.name, Data_stru.name, strlen(Data_stru.name)); |
| | | memcpy((Data_sum + count - 1)->procData.public_info, Data_stru.public_info, strlen(Data_stru.public_info)); |
| | | memcpy((Data_sum + count - 1)->procData.private_info, Data_stru.private_info, strlen(Data_stru.private_info)); |
| | | |
| | | (Data_sum + count - 1)->stat = 1; |
| | | (Data_sum + count - 1)->list_num = 3; |
| | | |
| | | val = proc_iter->first; |
| | | if ((proc_tcs_iter = procData->find(val)) != procData->end()) { |
| | | TcsSub_ele = proc_tcs_iter->second; |
| | | |
| | | temp = 0; |
| | | pos = 0; |
| | | len = sizeof(Data_sum->reg_info) - 1; |
| | | for (tcssub_iter = TcsSub_ele->begin(); tcssub_iter != TcsSub_ele->end(); ++tcssub_iter) { |
| | | |
| | | if (temp == 0) { |
| | | strncpy((Data_sum + count - 1)->reg_info, (*tcssub_iter).c_str(), strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str())); |
| | | pos += strlen((Data_sum + count - 1)->reg_info); |
| | | len -= strlen((Data_sum + count - 1)->reg_info); |
| | | |
| | | temp++; |
| | | } else { |
| | | |
| | | if (len > 0) { |
| | | strcat((Data_sum + count - 1)->reg_info, ","); |
| | | |
| | | pos += 1; |
| | | len -= 1; |
| | | } |
| | | |
| | | if (len > 0) { |
| | | size = strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str()); |
| | | strncpy(&(Data_sum + count - 1)->reg_info[pos], (*tcssub_iter).c_str(), size); |
| | | |
| | | pos += size; |
| | | len -= size; |
| | | } |
| | | } |
| | | } |
| | | |
| | | pos = 0; |
| | | temp = 0; |
| | | len = sizeof(Data_sum->local_info) - 1; |
| | | for (subs_iter = topic_sub_map->begin(); subs_iter != topic_sub_map->end(); ++subs_iter) { |
| | | subs_proc = subs_iter->second; |
| | | |
| | | if ((subs_proc_iter = subs_proc->find(val)) != subs_proc->end()) { |
| | | |
| | | if ((temp == 0)) { |
| | | |
| | | strncpy((Data_sum + count - 1)->local_info, subs_iter->first.c_str(), strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str())); |
| | | pos += strlen((Data_sum + count - 1)->local_info); |
| | | len -= strlen((Data_sum + count - 1)->local_info); |
| | | |
| | | temp++; |
| | | } else { |
| | | |
| | | if (len > 0) { |
| | | strcat((Data_sum + count - 1)->local_info, ","); |
| | | |
| | | pos += 1; |
| | | len -= 1; |
| | | } |
| | | |
| | | if (len > 0) { |
| | | size = strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str()); |
| | | strncpy(&(Data_sum + count - 1)->local_info[pos], subs_iter->first.c_str(), size); |
| | | |
| | | pos += size; |
| | | len -= size; |
| | | } |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | temp = count * sizeof(ProcInfo_sum); |
| | | void *last_buf = malloc(temp + sizeof(int)); |
| | | if (last_buf == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | *(int *)last_buf = count; |
| | | if (count > 0) { |
| | | memcpy((char *)last_buf + sizeof(int), (char *)Data_sum, temp); |
| | | free(Data_sum); |
| | | } |
| | | |
| | | shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); |
| | | |
| | | free(last_buf); |
| | | } else { |
| | | |
| | | char *ptr = NULL; |
| | | strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); |
| | | |
| | | data1 = atoi(buf_temp); |
| | | ptr = strstr(buf_temp, STR_MAGIC); |
| | | if (ptr != NULL) { |
| | | data2 = atoi(ptr + 1); |
| | | } |
| | | BusServerSocket::buf_data_set(data1, data2); |
| | | } |
| | | } |
| | | |
| | | int BusServerSocket::get_data(int val) { |
| | | |
| | | ProcZone::iterator proc_iter; |
| | | ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); |
| | | |
| | | pthread_mutex_lock(&gMutex); |
| | | if ((proc_iter = proc->find(val)) != proc->end()) { |
| | | pthread_mutex_unlock(&gMutex); |
| | | return true; |
| | | } |
| | | pthread_mutex_unlock(&gMutex); |
| | | |
| | | return false; |
| | | |
| | | } |
| | | |
| | | int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \ |
| | | const struct timespec *timeout, const int flag) { |
| | | int ret; |
| | | |
| | | ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag); |
| | | |
| | | return ret; |
| | | } |
| | | |
| | | void BusServerSocket::remove_proc(int val) { |
| | | BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG); |
| | | } |
| | | |
| | | // 运行代理 |
| | | void * BusServerSocket::_run_proxy_() { |
| | | int BusServerSocket::_run_proxy_() { |
| | | int size; |
| | | int key; |
| | | char * action, *topic, *topics, *buf, *content; |
| | | int flag; |
| | | char buf_temp[MAX_STR_LEN] = { 0x00 }; |
| | | char *action, *topic, *topics, *buf, *content; |
| | | size_t head_len; |
| | | char resp_buf[128]; |
| | | bus_head_t head; |
| | | int val; |
| | | ProcDataZone::iterator proc_que_iter; |
| | | ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); |
| | | |
| | | const char *topic_delim = ","; |
| | | // logger.debug("_run_proxy_ server receive before\n"); |
| | | while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { |
| | | // logger.debug("_run_proxy_ server recvfrom %d after: %s \n", key, buf); |
| | | int rv; |
| | | char send_buf[512] = { 0x00 }; |
| | | |
| | | const char *topic_delim = ","; |
| | | while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) { |
| | | head = ShmModSocket::decode_bus_head(buf); |
| | | topics = buf + BUS_HEAD_SIZE; |
| | | action = head.action; |
| | | // logger.debug("_run_proxy_ : %s\n", action); |
| | | if(strcmp(action, "sub") == 0) { |
| | | // 订阅支持多主题订阅 |
| | | topic = strtok(topics, topic_delim); |
| | | // logger.debug("_run_proxy_ topic = %s\n", topic); |
| | | while(topic) { |
| | | _proxy_sub(trim(topic, 0), key); |
| | | topic = strtok(NULL, topic_delim); |
| | |
| | | |
| | | } |
| | | else if(strcmp(action, "desub") == 0) { |
| | | // logger.debug("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); |
| | | if(strcmp(trim(topics, 0), "") == 0) { |
| | | // 取消所有订阅 |
| | | _proxy_desub_all(key); |
| | |
| | | |
| | | } |
| | | else if(strcmp(action, "pub") == 0) { |
| | | content = topics + head.topic_size; |
| | | _proxy_pub(topics, content, head.content_size, key); |
| | | topics[head.topic_size - 1] = '\0'; |
| | | content = topics + head.topic_size; |
| | | |
| | | } |
| | | else if(strcmp(action, "stop") == 0) { |
| | | logger->info( "Stopping Bus..."); |
| | | free(buf); |
| | | break; |
| | | } else { |
| | | logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action); |
| | | } |
| | | free(buf); |
| | | } |
| | | _proxy_pub(topics, topics, head.topic_size + head.content_size, key); |
| | | |
| | | } |
| | | else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \ |
| | | || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \ |
| | | || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \ |
| | | || (strcmp(action, "bufreg") == 0)) { |
| | | content = topics + head.topic_size; |
| | | if (strcmp(action, "reg") == 0) { |
| | | |
| | | flag = PROC_REG; |
| | | |
| | | } else if (strcmp(action, "unreg") == 0) { |
| | | |
| | | flag = PROC_UNREG; |
| | | |
| | | } else if (strcmp(action, "tcsreg") == 0) { |
| | | |
| | | flag = PROC_REG_TCS; |
| | | |
| | | } else if (strcmp(action, "tcsque") == 0) { |
| | | |
| | | flag = PROC_QUE_TCS; |
| | | |
| | | } else if (strcmp(action, "stcsque") == 0) { |
| | | |
| | | flag = PROC_QUE_STCS; |
| | | |
| | | } else if (strcmp(action, "atcsque") == 0) { |
| | | |
| | | flag = PROC_QUE_ATCS; |
| | | |
| | | } else { |
| | | |
| | | flag = PROC_REG_BUF; |
| | | |
| | | } |
| | | |
| | | if (flag == PROC_REG) { |
| | | memcpy(buf_temp, content, strlen(content) + 1); |
| | | |
| | | if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { |
| | | |
| | | val = proc_que_iter->second; |
| | | _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG); |
| | | } |
| | | } |
| | | |
| | | _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag); |
| | | |
| | | } |
| | | else if(strcmp(action, "stop") == 0) { |
| | | free(buf); |
| | | break; |
| | | } else { |
| | | logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action); |
| | | } |
| | | free(buf); |
| | | } |
| | | |
| | | |
| | | return NULL; |
| | | return rv; |
| | | } |
| | | |
| | | void BusServerSocket::_data_remove(int val) { |
| | | |
| | | int i; |
| | | LockFreeQueue<shm_packet_t> *queue = NULL; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | void *data_ptr = hashtable_get(hashtable, val); |
| | | |
| | | if (data_ptr != NULL) { |
| | | if (data_ptr != (void *)1) { |
| | | queue = (LockFreeQueue<shm_packet_t> *)data_ptr; |
| | | queue->close(); |
| | | for (i = 0; i < queue->size(); i++) { |
| | | mm_free((*queue)[i].buf); |
| | | } |
| | | sleep(1); |
| | | } |
| | | |
| | | hashtable_remove(hashtable, val); |
| | | } |
| | | |
| | | } |
| | | |
| | | void BusServerSocket::buf_data_set(int data, int val) { |
| | | recvbuf_val *val_buf; |
| | | recvbuf_data::iterator data_iter; |
| | | recvbuf_val::iterator val_iter; |
| | | |
| | | if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) { |
| | | val_buf = data_iter->second; |
| | | } else { |
| | | void *set_ptr = mm_malloc(sizeof(recvbuf_val)); |
| | | |
| | | val_buf = new(set_ptr) recvbuf_val; |
| | | recvBuf_data.insert({data, val_buf}); |
| | | } |
| | | |
| | | val_buf->insert(val); |
| | | } |
| | | |
| | | void BusServerSocket::buf_data_remove(int data) { |
| | | |
| | | int val; |
| | | recvbuf_val *val_buf; |
| | | recvbuf_data::iterator data_iter; |
| | | recvbuf_val::iterator val_iter; |
| | | |
| | | if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) { |
| | | |
| | | val_buf = data_iter->second; |
| | | for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) { |
| | | val = *val_iter; |
| | | |
| | | BusServerSocket::_data_remove(val); |
| | | } |
| | | |
| | | recvBuf_data.erase(data); |
| | | } |
| | | } |
| | | |
| | | |