#include "bus_server_socket.h" #include "shm_mod_socket.h" #include "shm_socket.h" #include "bus_error.h" static Logger *logger = LoggerFactory::getLogger(); list gLinkedList; void BusServerSocket::foreach_subscripters(std::function cb) { SHMTopicSubMap *topic_sub_map = shm_mm_attach(SHM_BUS_MAP_KEY); SHMKeySet *subscripter_set; SHMKeySet::iterator set_iter; SHMTopicSubMap::iterator map_iter; if(topic_sub_map != NULL) { for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; if(subscripter_set != NULL) { for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { cb(subscripter_set, *set_iter); } } } } } size_t BusServerSocket::remove_subscripters(int keys[], size_t length) { size_t count = 0; int key; for(int i = 0; i < length; i++) { key = keys[i]; SHMTopicSubMap *topic_sub_map = shm_mm_attach(SHM_BUS_MAP_KEY); SHMKeySet *subscripter_set; SHMKeySet::iterator set_iter; SHMTopicSubMap::iterator map_iter; if(topic_sub_map != NULL) { for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) { subscripter_set->erase(set_iter); count++; } } } } return count; } BusServerSocket::BusServerSocket() { shm_socket = shm_socket_open(SHM_SOCKET_DGRAM); topic_sub_map = NULL; } BusServerSocket::~BusServerSocket() { destroy(); } int BusServerSocket::bind(int key) { return shm_socket_bind(shm_socket, key); } /** * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key * @return 0 成功, 其他值 失败的错误码 */ int BusServerSocket::force_bind(int key) { return shm_socket_force_bind(shm_socket, key); } /** * 启动bus * * @return 0 成功, 其他值 失败的错误码 */ int BusServerSocket::start(){ int rv; topic_sub_map = shm_mm_attach(SHM_BUS_MAP_KEY); rv = _run_proxy_(); return rv; } int BusServerSocket::stop(){ int ret; if( shm_socket->key <= 0) { return -1; } bus_head_t head = {}; memcpy(head.action, "stop", sizeof(head.action)); head.topic_size = 0; head.content_size = 0; ShmModSocket client; void *buf; int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL, 0, &buf); if(size > 0) { ret = client.sendto( buf, size, shm_socket->key); free(buf); return ret; } else { return -1; } } int BusServerSocket::destroy() { SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; if(topic_sub_map != NULL) { for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; if(subscripter_set != NULL) { subscripter_set->clear(); mm_free((void *)subscripter_set); } } topic_sub_map->clear(); shm_mm_free_by_key(SHM_BUS_MAP_KEY); } shm_socket_close(shm_socket); return 0; } /* * 处理订阅 */ void BusServerSocket::_proxy_sub( char *topic, int key) { SHMKeySet *subscripter_set; struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0}; SHMTopicSubMap::iterator map_iter; SHMKeySet::iterator set_iter; if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; } else { void *set_ptr = mm_malloc(sizeof(SHMKeySet)); subscripter_set = new(set_ptr) SHMKeySet; topic_sub_map->insert({topic, subscripter_set}); } subscripter_set->insert(key); } /* * 处理取消订阅 */ void BusServerSocket::_proxy_desub( char *topic, int key) { SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; // SHMKeySet::iterator set_iter; if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; subscripter_set->erase(key); } } /* * 处理取消所有订阅 */ void BusServerSocket::_proxy_desub_all(int key) { SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; // SHMKeySet::iterator set_iter; for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; subscripter_set->erase(key); } } /* * 处理发布,代理转发 */ void BusServerSocket::_proxy_pub( char *topic, char *buf, size_t size, int key) { SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; SHMKeySet::iterator set_iter; std::vector subscripter_to_del; std::vector::iterator vector_iter; int send_key; int rv; struct timespec timeout = {1,0}; if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { send_key = *set_iter; rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG); if(rv == 0) { continue; } //对方已关闭的或者对应的进程被kill掉的连接放到待删除队列里。如果直接删除会让iter指针出现错乱 subscripter_to_del.push_back(send_key); } // 删除已关闭的端 for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); ++vector_iter) { if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) { subscripter_set->erase(set_iter); logger->debug("remove closed subscripter %d \n", send_key); } } subscripter_to_del.clear(); } } ProcInfo_query *Qurey_object(const char *object, int *length) { int flag = 0; int val; int len; int total = 0; ProcInfo *Proc_ptr = NULL; ProcInfo Data_stru; ProcInfo_query *dataBuf = NULL; SvrProc *SvrSub_ele; SvrTcs::iterator svr_tcs_iter; SvrProc::iterator svr_proc_iter; ProcZone::iterator proc_iter; SvrTcs *SvrData = shm_mm_attach(SHM_BUS_TCS_MAP_KEY); ProcZone *proc = shm_mm_attach(SHM_BUS_PROC_MAP_KEY); if ((svr_tcs_iter = SvrData->find(object)) != SvrData->end()) { SvrSub_ele = svr_tcs_iter->second; for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { val = *svr_proc_iter; if ((proc_iter = proc->find(val)) != proc->end()) { if (dataBuf == NULL) { dataBuf = (ProcInfo_query *)malloc(sizeof(ProcInfo_query)); if (dataBuf == NULL) { logger->error("in proxy_reg: Out of memory!\n"); exit(1); } total = sizeof(ProcInfo_query); } if (flag == 0) { memset(dataBuf, 0x00, sizeof(ProcInfo_query)); dataBuf->num = 1; strncpy(dataBuf->name, object, sizeof(dataBuf->name) - 1); flag = 1; } else { dataBuf->num++; len = sizeof(ProcInfo_query) + sizeof(ProcInfo) * (dataBuf->num - 1); dataBuf = (ProcInfo_query *)realloc(dataBuf, len); if (dataBuf == NULL) { logger->error("in proxy_reg: Out of memory!\n"); exit(1); } total += sizeof(ProcInfo); memset((char *)dataBuf + len - sizeof(ProcInfo), 0x00, sizeof(ProcInfo)); } memset(&Data_stru, 0x00, sizeof(ProcInfo)); Data_stru = proc_iter->second; Proc_ptr = &(dataBuf->procData) + dataBuf->num - 1; strncpy(Proc_ptr->proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id) + 1); strncpy(Proc_ptr->name, Data_stru.name, strlen(Data_stru.name) + 1); strncpy(Proc_ptr->public_info, Data_stru.public_info, strlen(Data_stru.public_info) + 1); strncpy(Proc_ptr->private_info, Data_stru.private_info, strlen(Data_stru.private_info) + 1); if (length != NULL) *length = total; } } } return dataBuf; } void list::Insert(int aData, int bData) { LinkNode *pHead = NULL; LinkNode *pNew = NULL; LinkNode *pCur = NULL; pNew = new(LinkNode); pNew->data = aData; pNew->data_fix = bData; pNew->count = 0; pHead = head; pCur = pHead; if(pHead == NULL) { head = pNew; pNew->next = NULL; } else { while(pCur->next != NULL) { pCur = pCur->next; } pCur->next = pNew; pNew->next = NULL; } } void list::Delete(int data) { LinkNode *pHead; LinkNode *pCur; LinkNode *pNext; pHead = head; pCur = pHead; if(pHead == NULL) return; while((pCur != NULL) && (pCur->data == data)) { head = pCur->next; delete(pCur); pCur = head; } while((pCur != NULL) && (pCur->next != NULL)) { pNext = pCur->next; if(pNext->data == data) { pCur->next = pNext->next; pCur = pNext->next; delete(pNext); } else { pCur = pNext; } } } void list::dataSet(int data, int val) { LinkNode *pCur; pCur = head; if(pCur == NULL) return; while(pCur != NULL) { if(pCur->data == data) { pCur->count = val; } pCur = pCur->next; } } int list::dataGet(int data) { LinkNode *pCur; pCur = head; if(pCur == NULL) return 0; while(pCur != NULL) { if(pCur->data == data) { return pCur->count; } pCur = pCur->next; } return 0; } int list::dataFixGet(int data) { LinkNode *pCur; pCur = head; if(pCur == NULL) return 0; while(pCur != NULL) { if(pCur->data == data) { return pCur->data_fix; } pCur = pCur->next; } return 0; } int list::NodeNum(void) { int count = 0; LinkNode *pCur = head; if (pCur == NULL) { return 0; } while(pCur != NULL) { ++count; pCur = pCur->next; } return count; } int list::nodeGet(int index) { int count = 0; LinkNode *pCur = head; if (pCur == NULL) { return 0; } while((pCur != NULL) && (count <= index)) { if (count == index) { return pCur->data; } ++count; pCur = pCur->next; } return 0; } void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag) { char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; int count = 0; int i = 0; int len = 0; int data1, data2; char *data_ptr; ProcInfo Data_stru; ProcZone::iterator proc_iter; TcsZone *TcsSub_ele; ProcDataZone::iterator proc_que_iter; ProcTcsMap::iterator proc_tcs_iter; SvrProc *SvrSub_ele; SvrProc::iterator svr_proc_iter; SvrTcs::iterator svr_tcs_iter; TcsZone::iterator tcssub_iter; ProcPartZone::iterator proc_part_iter; struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0}; if ((flag == PROC_REG) || (flag == PROC_UNREG)) { memset(&Data_stru, 0x00, sizeof(ProcInfo)); if (buf != NULL) { memcpy(Data_stru.proc_id, buf, strlen(buf) + 1); count = strlen(buf) + 1; memcpy(Data_stru.name, buf + count, strlen(buf + count) + 1); count += strlen(buf + count) + 1; memcpy(Data_stru.public_info, buf + count, strlen(buf + count) + 1); count += strlen(buf + count) + 1; memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1); count += strlen(buf + count) + 1; memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1); count += strlen(buf + count) + 1; memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); count += strlen(buf + count) + 1; if (flag == PROC_REG) { gLinkedList.Insert(key, atoi(Data_stru.int_info)); } } ProcZone *proc = shm_mm_attach(SHM_BUS_PROC_MAP_KEY); ProcDataZone *procQuePart = shm_mm_attach(SHM_QUEUE_ST_SET); ProcPartZone *procPart = shm_mm_attach(SHM_BUS_PROC_PART_MAP_KEY); if (flag == PROC_REG) { if ((proc_iter = proc->find(key)) == proc->end()) { proc->insert({key, Data_stru}); } if ((proc_part_iter = procPart->find(key)) == procPart->end()) { procPart->insert({key, Data_stru.proc_id}); } if ((proc_que_iter = procQuePart->find(Data_stru.proc_id)) == procQuePart->end()) { procQuePart->insert({Data_stru.proc_id, key}); } } else { SvrTcs *SvrData = shm_mm_attach(SHM_BUS_TCS_MAP_KEY); for (svr_tcs_iter = SvrData->begin(); svr_tcs_iter != SvrData->end(); ++svr_tcs_iter) { SvrSub_ele = svr_tcs_iter->second; SvrSub_ele->erase(key); } if ((proc_iter = proc->find(key)) != proc->end()) { data1 = atoi((proc_iter->second).int_info); data2 = atoi((proc_iter->second).svr_info); BusServerSocket::_data_remove(data1, data2); len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); strncpy(buf_temp, (proc_iter->second).proc_id, len); proc->erase(proc_iter); } if ((proc_part_iter = procPart->find(key)) != procPart->end()) { procPart->erase(key); } if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { procQuePart->erase(buf_temp); } } } else if (flag == PROC_REG_TCS) { ProcTcsMap *proc = shm_mm_attach(SHM_BUS_PROC_TCS_MAP_KEY); SvrTcs *SvrData = shm_mm_attach(SHM_BUS_TCS_MAP_KEY); if ((proc_tcs_iter = proc->find(key)) != proc->end()) { TcsSub_ele = proc_tcs_iter->second; } else { void *ptr_set = mm_malloc(sizeof(TcsZone)); TcsSub_ele = new(ptr_set) TcsZone; proc->insert({key, TcsSub_ele}); } strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); data_ptr = strtok(const_cast(buf_temp), STR_MAGIC); while(data_ptr) { TcsSub_ele->insert(data_ptr); if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) { SvrSub_ele = svr_tcs_iter->second; } else { void *ptr_set = mm_malloc(sizeof(SvrProc)); SvrSub_ele = new(ptr_set) SvrProc; SvrData->insert({data_ptr, SvrSub_ele}); } SvrSub_ele->insert(key); data_ptr = strtok(NULL, STR_MAGIC); } } else if (flag == PROC_QUE_TCS) { struct _temp_store { void *ptr; int total; } *temp_store = NULL; int num = 0; int sum = 0; ProcInfo_query *ret = NULL; ProcInfo_query *ret_store = NULL; strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); data_ptr = strtok(const_cast(buf_temp), STR_MAGIC); while(data_ptr) { ret = Qurey_object(data_ptr, &len); if (ret != NULL) { if (temp_store == NULL) { temp_store = (_temp_store *)malloc(sizeof(_temp_store)); if (temp_store == NULL) { logger->error("in proxy_reg: Out of memory!\n"); exit(1); } temp_store->ptr = ret; temp_store->total = len; num = 1; } else { num++; temp_store = (_temp_store *)realloc(temp_store, sizeof(_temp_store) * num); if (temp_store == NULL) { logger->error("in proxy_reg: Out of memory!\n"); exit(1); } (temp_store + num - 1)->ptr = ret; (temp_store + num - 1)->total = len; } } data_ptr = strtok(NULL, STR_MAGIC); } if (num > 0) { for (count = 0; count < num; count++) { if (ret_store == NULL) { ret_store = (ProcInfo_query *)malloc((temp_store + count)->total); if (ret_store == NULL) { logger->error("in proxy_reg: Out of memory!\n"); exit(1); } sum = (temp_store + count)->total; memcpy(ret_store, (temp_store + count)->ptr, (temp_store +count)->total); } else { ret_store = (ProcInfo_query *)realloc(ret_store, sum + (temp_store + count)->total); if (ret_store == NULL) { logger->error("in proxy_reg: Out of memory!\n"); exit(1); } memcpy((char *)ret_store + sum, (temp_store + count)->ptr, (temp_store + count)->total); sum += (temp_store + count)->total; } free((temp_store + count)->ptr); } free(temp_store); } void *last_buf = malloc(sum + sizeof(int)); if (last_buf == NULL) { logger->error("in proxy_reg: Out of memory!\n"); exit(1); } *(int *)last_buf = num; if (num > 0) { memcpy((char *)last_buf + sizeof(int), (char *)ret_store, sum); free(ret_store); } shm_sendto(shm_socket, last_buf, sum + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); free(last_buf); } else if (flag == PROC_QUE_STCS) { SvrTcs *SvrData = shm_mm_attach(SHM_BUS_TCS_MAP_KEY); ProcZone *proc = shm_mm_attach(SHM_BUS_PROC_MAP_KEY); strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) { SvrSub_ele = svr_tcs_iter->second; for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { count = *svr_proc_iter; if ((proc_iter = proc->find(count)) != proc->end()) { count = atoi((proc_iter->second).svr_info); } break; } } else { count = 0; } memset(buf_temp, 0x00, sizeof(buf_temp)); sprintf(buf_temp, "%d", count); shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG); } else { int val; int temp = 0; int pos = 0; int size = 0; ProcInfo_sum *Data_sum = NULL; SHMKeySet *subs_proc; SHMKeySet::iterator subs_proc_iter; SHMTopicSubMap::iterator subs_iter; ProcTcsMap *procData = shm_mm_attach(SHM_BUS_PROC_TCS_MAP_KEY); ProcZone *proc = shm_mm_attach(SHM_BUS_PROC_MAP_KEY); for (proc_iter = proc->begin(); proc_iter != proc->end(); ++proc_iter) { memset(&Data_stru, 0x00, sizeof(Data_stru)); if (count == 0) { Data_sum = (ProcInfo_sum *)malloc(sizeof(ProcInfo_sum)); if (Data_sum == NULL) { logger->error("in proxy_reg: Out of memory!\n"); exit(1); } count++; memset(Data_sum, 0x00, sizeof(ProcInfo_sum)); } else { count++; len = sizeof(ProcInfo_sum) * count; Data_sum = (ProcInfo_sum *)realloc(Data_sum, len); if (Data_sum == NULL) { logger->error("in proxy_reg: Out of memory!\n"); exit(1); } memset(Data_sum + count - 1, 0x00, sizeof(ProcInfo_sum)); } Data_stru = proc_iter->second; memcpy((Data_sum + count - 1)->procData.proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id)); memcpy((Data_sum + count - 1)->procData.name, Data_stru.name, strlen(Data_stru.name)); memcpy((Data_sum + count - 1)->procData.public_info, Data_stru.public_info, strlen(Data_stru.public_info)); memcpy((Data_sum + count - 1)->procData.private_info, Data_stru.private_info, strlen(Data_stru.private_info)); (Data_sum + count - 1)->stat = 1; (Data_sum + count - 1)->list_num = 3; val = proc_iter->first; if ((proc_tcs_iter = procData->find(val)) != procData->end()) { TcsSub_ele = proc_tcs_iter->second; temp = 0; pos = 0; len = sizeof(Data_sum->reg_info) - 1; for (tcssub_iter = TcsSub_ele->begin(); tcssub_iter != TcsSub_ele->end(); ++tcssub_iter) { if (temp == 0) { strncpy((Data_sum + count - 1)->reg_info, (*tcssub_iter).c_str(), strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str())); pos += strlen((Data_sum + count - 1)->reg_info); len -= strlen((Data_sum + count - 1)->reg_info); temp++; } else { if (len > 0) { strcat((Data_sum + count - 1)->reg_info, ","); pos += 1; len -= 1; } if (len > 0) { size = strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str()); strncpy(&(Data_sum + count - 1)->reg_info[pos], (*tcssub_iter).c_str(), size); pos += size; len -= size; } } } pos = 0; temp = 0; len = sizeof(Data_sum->local_info) - 1; for (subs_iter = topic_sub_map->begin(); subs_iter != topic_sub_map->end(); ++subs_iter) { subs_proc = subs_iter->second; if ((subs_proc_iter = subs_proc->find(val)) != subs_proc->end()) { if ((temp == 0)) { strncpy((Data_sum + count - 1)->local_info, subs_iter->first.c_str(), strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str())); pos += strlen((Data_sum + count - 1)->local_info); len -= strlen((Data_sum + count - 1)->local_info); temp++; } else { if (len > 0) { strcat((Data_sum + count - 1)->local_info, ","); pos += 1; len -= 1; } if (len > 0) { size = strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str()); strncpy(&(Data_sum + count - 1)->local_info[pos], subs_iter->first.c_str(), size); pos += size; len -= size; } } } } } } temp = count * sizeof(ProcInfo_sum); void *last_buf = malloc(temp + sizeof(int)); if (last_buf == NULL) { logger->error("in proxy_reg: Out of memory!\n"); exit(1); } *(int *)last_buf = count; if (count > 0) { memcpy((char *)last_buf + sizeof(int), (char *)Data_sum, temp); free(Data_sum); } shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); free(last_buf); } } int BusServerSocket::get_data(int val) { ProcZone::iterator proc_iter; ProcZone *proc = shm_mm_attach(SHM_BUS_PROC_MAP_KEY); if ((proc_iter = proc->find(val)) != proc->end()) { return true; } return false; } int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \ const struct timespec *timeout, const int flag) { int ret; ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag); return ret; } void BusServerSocket::remove_proc(int val) { BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG); } // 运行代理 int BusServerSocket::_run_proxy_() { int size; int key; int flag; char buf_temp[MAX_STR_LEN] = { 0x00 }; char * action, *topic, *topics, *buf, *content; size_t head_len; bus_head_t head; int val; ProcDataZone::iterator proc_que_iter; ProcDataZone *procQuePart = shm_mm_attach(SHM_QUEUE_ST_SET); int rv; char send_buf[512] = { 0x00 }; const char *topic_delim = ","; while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) { head = ShmModSocket::decode_bus_head(buf); topics = buf + BUS_HEAD_SIZE; action = head.action; if(strcmp(action, "sub") == 0) { // 订阅支持多主题订阅 topic = strtok(topics, topic_delim); while(topic) { _proxy_sub(trim(topic, 0), key); topic = strtok(NULL, topic_delim); } } else if(strcmp(action, "desub") == 0) { if(strcmp(trim(topics, 0), "") == 0) { // 取消所有订阅 _proxy_desub_all(key); } else { topic = strtok(topics, topic_delim); while(topic) { _proxy_desub(trim(topic, 0), key); topic = strtok(NULL, topic_delim); } } } else if(strcmp(action, "pub") == 0) { topics[head.topic_size - 1] = '\0'; content = topics + head.topic_size; _proxy_pub(topics, topics, head.topic_size + head.content_size, key); } else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \ || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \ || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) { content = topics + head.topic_size; if (strcmp(action, "reg") == 0) { flag = PROC_REG; } else if (strcmp(action, "unreg") == 0) { flag = PROC_UNREG; } else if (strcmp(action, "tcsreg") == 0) { flag = PROC_REG_TCS; } else if (strcmp(action, "tcsque") == 0) { flag = PROC_QUE_TCS; } else if (strcmp(action, "stcsque") == 0) { flag = PROC_QUE_STCS; } else { flag = PROC_QUE_ATCS; } if (flag == PROC_REG) { memcpy(buf_temp, content, strlen(content) + 1); if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { val = proc_que_iter->second; _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG); } } _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag); } else if (strncmp(buf, "request", strlen("request")) == 0) { sprintf(send_buf, "%4d", key); strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4)); rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key); if(rv != 0) { logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n"); } } else if(strcmp(action, "stop") == 0) { free(buf); break; } else { logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action); } free(buf); } return rv; } void BusServerSocket::_data_remove(int val1, int val2) { int i; LockFreeQueue *queue = NULL; hashtable_t *hashtable = mm_get_hashtable(); void *data_ptr1 = hashtable_get(hashtable, val1); void *data_ptr2 = hashtable_get(hashtable, val2); if (data_ptr1 != NULL) { if (data_ptr1 != (void *)1) { queue = (LockFreeQueue *)data_ptr1; queue->close(); for (i = 0; i < queue->size(); i++) { mm_free((*queue)[i].buf); } sleep(1); } hashtable_remove(hashtable, val1); } if (data_ptr2 != NULL) { if (data_ptr2 != (void *)1) { queue = (LockFreeQueue *)data_ptr2; queue->close(); for (i = 0; i < queue->size(); i++) { mm_free((*queue)[i].buf); } sleep(1); } hashtable_remove(hashtable, val2); } }