| | |
| | | } |
| | | |
| | | BusServerSocket::~BusServerSocket() { |
| | | // printf("BusServerSocket destory 1\n"); |
| | | SHMKeySet *subscripter_set; |
| | | SHMTopicSubMap::iterator map_iter; |
| | | |
| | | stop(); |
| | | sleep(2); |
| | | // printf("BusServerSocket destory 2\n"); |
| | | |
| | | if(topic_sub_map != NULL) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | subscripter_set = map_iter->second; |
| | | // printf("BusServerSocket destory 2-1\n"); |
| | | if(subscripter_set != NULL) { |
| | | // printf("BusServerSocket destory 2-2\n"); |
| | | subscripter_set->clear(); |
| | | // printf("BusServerSocket destory 2-3\n"); |
| | | mm_free((void *)subscripter_set); |
| | | // printf("BusServerSocket destory 2-4\n"); |
| | | } |
| | | |
| | | } |
| | | topic_sub_map->clear(); |
| | | mem_pool_free_by_key(BUS_MAP_KEY); |
| | | } |
| | | // printf("BusServerSocket destory 3\n"); |
| | | // printf("=============close socket\n"); |
| | | shm_close_socket(shm_socket); |
| | | // printf("BusServerSocket destory 4\n"); |
| | | } |
| | | |
| | | |
| | |
| | | int BusServerSocket::force_bind(int key) { |
| | | return shm_socket_force_bind(shm_socket, key); |
| | | } |
| | | |
| | | /** |
| | | * 启动bus |
| | | * |
| | |
| | | topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); |
| | | |
| | | run_pubsub_proxy(); |
| | | // pthread_t tid; |
| | | // pthread_create(&tid, NULL, run_accept_sub_request, _socket); |
| | | // 进程停止的时候,预留3秒资源回收的时间。否则,会发生调用close的时候,共享内存的资源还没来得及回收进程就退出了 |
| | | sleep(3); |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | int BusServerSocket::stop(){ |
| | | char buf[128]; |
| | | int ret; |
| | | |
| | | if( shm_socket->key <= 0) { |
| | | return -1; |
| | | } |
| | | snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER); |
| | | return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0); |
| | | // snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER); |
| | | // return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0); |
| | | bus_head_t head = {}; |
| | | memcpy(head.action, "stop", sizeof(head.action)); |
| | | head.topic_size = 0; |
| | | head.content_size = 0; |
| | | |
| | | void *recv_buf; |
| | | int recv_size; |
| | | |
| | | void *buf; |
| | | int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL, 0, &buf); |
| | | if(size > 0) { |
| | | ret = shm_sendandrecv(shm_socket, buf, size, shm_socket->key, &recv_buf, &recv_size); |
| | | free(buf); |
| | | free(recv_buf); |
| | | return ret; |
| | | } else { |
| | | return -1; |
| | | } |
| | | |
| | | } |
| | | |
| | | /* |
| | |
| | | head = ShmModSocket::decode_bus_head(buf); |
| | | topics = buf + BUS_HEAD_SIZE; |
| | | action = head.action; |
| | | // if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) { |
| | | printf("run_pubsub_proxy : %s, %s \n", action, topics); |
| | | // printf("run_pubsub_proxy : %s, %s \n", action, topics); |
| | | if(strcmp(action, "sub") == 0) { |
| | | // 订阅支持多主题订阅 |
| | | topic = strtok(topics, topic_delim); |
| | |
| | | content = topics + head.topic_size; |
| | | _proxy_pub(topics, content, head.content_size, key); |
| | | } else if(strcmp(action, "stop") == 0) { |
| | | logger->info( "Stopping Bus..."); |
| | | logger->info( "Stopping Bus..."); |
| | | // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER); |
| | | // shm_sendto(shm_socket, resp_buf, strlen(resp_buf), key); |
| | | // free(action); |
| | | // free(topics); |
| | | free(buf); |
| | | break; |
| | | shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key); |
| | | |
| | | free(buf); |
| | | break; |
| | | } else { |
| | | logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action"); |
| | | } |
| | |
| | | |
| | | |
| | | /** |
| | | * deprecate |
| | | * @str "<**sub**>{经济}" |
| | | */ |
| | | |