wangzhengquan
2020-12-21 b029ff78a59bfe8af4e66f844644a776f7678eef
src/socket/bus_server_socket.c
@@ -65,33 +65,24 @@
}
BusServerSocket::~BusServerSocket() {
// printf("BusServerSocket  destory 1\n");
   SHMKeySet *subscripter_set;
   SHMTopicSubMap::iterator map_iter;
   stop();
   sleep(2);
// printf("BusServerSocket  destory 2\n");
   if(topic_sub_map != NULL) {
      for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
         subscripter_set = map_iter->second;
// printf("BusServerSocket  destory 2-1\n");
         if(subscripter_set != NULL) {
// printf("BusServerSocket  destory 2-2\n");
            subscripter_set->clear();
// printf("BusServerSocket  destory 2-3\n");
            mm_free((void *)subscripter_set);
// printf("BusServerSocket  destory 2-4\n");
         }
      }
      topic_sub_map->clear();
      mem_pool_free_by_key(BUS_MAP_KEY);
   }
// printf("BusServerSocket  destory 3\n");
   // printf("=============close socket\n");
   shm_close_socket(shm_socket);
// printf("BusServerSocket  destory 4\n");
}
@@ -107,6 +98,7 @@
int BusServerSocket::force_bind(int key) {
   return shm_socket_force_bind(shm_socket, key);
}
/**
 * 启动bus
 * 
@@ -116,20 +108,39 @@
   topic_sub_map =   mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
 
   run_pubsub_proxy();
   // pthread_t tid;
   // pthread_create(&tid, NULL, run_accept_sub_request, _socket);
   // 进程停止的时候,预留3秒资源回收的时间。否则,会发生调用close的时候,共享内存的资源还没来得及回收进程就退出了
   sleep(3);
   return 0;
}
int  BusServerSocket::stop(){
   char buf[128];
   int ret;
    
   if( shm_socket->key <= 0) {
      return -1;
   }
   snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
   return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0);
   // snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
   // return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0);
   bus_head_t head = {};
   memcpy(head.action, "stop", sizeof(head.action));
   head.topic_size = 0;
   head.content_size = 0;
   void *recv_buf;
   int recv_size;
   void *buf;
   int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL,  0, &buf);
   if(size > 0) {
      ret = shm_sendandrecv(shm_socket, buf, size, shm_socket->key, &recv_buf, &recv_size);
      free(buf);
      free(recv_buf);
      return ret;
   } else {
      return -1;
   }
}
/*
@@ -239,8 +250,7 @@
      head = ShmModSocket::decode_bus_head(buf);
      topics = buf + BUS_HEAD_SIZE;
      action = head.action;
      // if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
  printf("run_pubsub_proxy : %s, %s \n", action, topics);
  // printf("run_pubsub_proxy : %s, %s \n", action, topics);
      if(strcmp(action, "sub") == 0) {
         // 订阅支持多主题订阅
         topic = strtok(topics, topic_delim);
@@ -268,13 +278,12 @@
          content = topics + head.topic_size;
         _proxy_pub(topics, content, head.content_size, key);
      }  else if(strcmp(action, "stop") == 0) {
          logger->info( "Stopping Bus...");
         logger->info( "Stopping Bus...");
          // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
          // shm_sendto(shm_socket, resp_buf, strlen(resp_buf), key);
          // free(action);
          // free(topics);
          free(buf);
          break;
         shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key);
         free(buf);
         break;
      } else {
         logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action");
      }
@@ -292,6 +301,7 @@
/**
 * deprecate
 * @str "<**sub**>{经济}"
 */