wangzhengquan
2020-12-21 b029ff78a59bfe8af4e66f844644a776f7678eef
bus 加停止功能
5个文件已修改
61 ■■■■■ 已修改文件
src/socket/bus_server_socket.c 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket.c 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.c 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.c 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.c
@@ -65,33 +65,24 @@
}
BusServerSocket::~BusServerSocket() {
// printf("BusServerSocket  destory 1\n");
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    stop();
    sleep(2);
// printf("BusServerSocket  destory 2\n");
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
// printf("BusServerSocket  destory 2-1\n");
            if(subscripter_set != NULL) {
// printf("BusServerSocket  destory 2-2\n");
                subscripter_set->clear();
// printf("BusServerSocket  destory 2-3\n");
                mm_free((void *)subscripter_set);
// printf("BusServerSocket  destory 2-4\n");
            }
        }
        topic_sub_map->clear();
        mem_pool_free_by_key(BUS_MAP_KEY);
    }
// printf("BusServerSocket  destory 3\n");
    // printf("=============close socket\n");
    shm_close_socket(shm_socket);
// printf("BusServerSocket  destory 4\n");
}
@@ -107,6 +98,7 @@
int BusServerSocket::force_bind(int key) {
    return shm_socket_force_bind(shm_socket, key);
}
/**
 * 启动bus
 * 
@@ -116,20 +108,39 @@
    topic_sub_map =    mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
 
    run_pubsub_proxy();
    // pthread_t tid;
    // pthread_create(&tid, NULL, run_accept_sub_request, _socket);
    // 进程停止的时候,预留3秒资源回收的时间。否则,会发生调用close的时候,共享内存的资源还没来得及回收进程就退出了
    sleep(3);
    return 0;
}
int  BusServerSocket::stop(){
    char buf[128];
    int ret;
     
    if( shm_socket->key <= 0) {
        return -1;
    }
    snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
    return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0);
    // snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
    // return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0);
    bus_head_t head = {};
    memcpy(head.action, "stop", sizeof(head.action));
    head.topic_size = 0;
    head.content_size = 0;
    void *recv_buf;
    int recv_size;
    void *buf;
    int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL,  0, &buf);
    if(size > 0) {
        ret = shm_sendandrecv(shm_socket, buf, size, shm_socket->key, &recv_buf, &recv_size);
        free(buf);
        free(recv_buf);
        return ret;
    } else {
        return -1;
    }
}
/*
@@ -239,8 +250,7 @@
        head = ShmModSocket::decode_bus_head(buf);
        topics = buf + BUS_HEAD_SIZE;
        action = head.action;
        // if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
  printf("run_pubsub_proxy : %s, %s \n", action, topics);
  // printf("run_pubsub_proxy : %s, %s \n", action, topics);
        if(strcmp(action, "sub") == 0) {
            // 订阅支持多主题订阅
            topic = strtok(topics, topic_delim);
@@ -270,9 +280,8 @@
        }  else if(strcmp(action, "stop") == 0) {
             logger->info( "Stopping Bus...");
             // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
             // shm_sendto(shm_socket, resp_buf, strlen(resp_buf), key);
             // free(action);
             // free(topics);
            shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key);
             free(buf);
             break;
        } else {
@@ -292,6 +301,7 @@
/**
 * deprecate
 * @str "<**sub**>{经济}"
 */
src/socket/net_mod_server_socket.c
@@ -244,7 +244,6 @@
    else if(request_head.timeout == -1) {
      ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, BUS_KEY);
    }
printf("bus server pub ret=%d\n", ret);
    response_head.code = ret;
    response_head.content_length = 0;
    if( rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH )
src/socket/net_mod_socket.c
@@ -287,6 +287,12 @@
    }
  }
  // 本地发送
  if(node_arr == NULL || arrlen == 0) {
    if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) {
      n_pub_suc++;
    }
  }
  
  for (i = 0; i < arrlen; i++) {
src/socket/shm_mod_socket.h
@@ -12,6 +12,7 @@
#include "socket_def.h"
#define BUS_HEAD_SIZE (64 + 2 * sizeof(uint32_t))
class BusServerSocket;
struct bus_head_t
{
@@ -22,6 +23,7 @@
class ShmModSocket {
friend class BusServerSocket;
private:
    shm_socket_t *shm_socket;
  socket_mod_t mod;
test_net_socket/test_net_mod_socket.c
@@ -83,7 +83,7 @@
  void * server_socket = bus_server_socket_wrapper_open();
  pthread_t tid;
  // 创建一个线程,可以关闭bus
  pthread_create(&tid, NULL, bus_handler, server_socket);
  // pthread_create(&tid, NULL, bus_handler, server_socket);
  if(bus_server_socket_wrapper_start_bus(server_socket) != 0) {
    printf("start bus failed\n");
    exit(1);