wangzhengquan
2020-12-22 26ed48c4e616014ee760fd13d13dbdc8539c34e3
src/socket/bus_server_socket.c
@@ -1,5 +1,6 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
static Logger *logger = LoggerFactory::getLogger();
@@ -64,33 +65,24 @@
}
BusServerSocket::~BusServerSocket() {
// printf("BusServerSocket  destory 1\n");
   SHMKeySet *subscripter_set;
   SHMTopicSubMap::iterator map_iter;
   stop();
   sleep(2);
// printf("BusServerSocket  destory 2\n");
   if(topic_sub_map != NULL) {
      for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
         subscripter_set = map_iter->second;
// printf("BusServerSocket  destory 2-1\n");
         if(subscripter_set != NULL) {
// printf("BusServerSocket  destory 2-2\n");
            subscripter_set->clear();
// printf("BusServerSocket  destory 2-3\n");
            mm_free((void *)subscripter_set);
// printf("BusServerSocket  destory 2-4\n");
         }
      }
      topic_sub_map->clear();
      mem_pool_free_by_key(BUS_MAP_KEY);
   }
// printf("BusServerSocket  destory 3\n");
   // printf("=============close socket\n");
   shm_close_socket(shm_socket);
// printf("BusServerSocket  destory 4\n");
}
@@ -106,6 +98,7 @@
int BusServerSocket::force_bind(int key) {
   return shm_socket_force_bind(shm_socket, key);
}
/**
 * 启动bus
 * 
@@ -115,20 +108,39 @@
   topic_sub_map =   mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
 
   run_pubsub_proxy();
   // pthread_t tid;
   // pthread_create(&tid, NULL, run_accept_sub_request, _socket);
   // 进程停止的时候,预留3秒资源回收的时间。否则,会发生调用close的时候,共享内存的资源还没来得及回收进程就退出了
   sleep(3);
   return 0;
}
int  BusServerSocket::stop(){
   char buf[128];
   int ret;
    
   if( shm_socket->key <= 0) {
      return -1;
   }
   snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
   return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0);
   // snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
   // return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0);
   bus_head_t head = {};
   memcpy(head.action, "stop", sizeof(head.action));
   head.topic_size = 0;
   head.content_size = 0;
   void *recv_buf;
   int recv_size;
   void *buf;
   int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL,  0, &buf);
   if(size > 0) {
      ret = shm_sendandrecv(shm_socket, buf, size, shm_socket->key, &recv_buf, &recv_size);
      free(buf);
      free(recv_buf);
      return ret;
   } else {
      return -1;
   }
}
/*
@@ -183,7 +195,7 @@
/*
 * 处理发布,代理转发
*/
void BusServerSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) {
void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) {
   SHMKeySet *subscripter_set;
   SHMTopicSubMap::iterator map_iter;
@@ -200,7 +212,7 @@
      for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
         send_key = *set_iter;
 // printf("_proxy_pub send before %d \n", send_key);
         if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
         if (shm_sendto(shm_socket, buf, size, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
            //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱
            subscripter_to_del.push_back(send_key);
         } else {
@@ -226,57 +238,61 @@
   // pthread_detach(pthread_self());
   int size;
   int key;
   char * action, *topic, *topics, *buf;
   char * action, *topic, *topics, *buf, *content;
   size_t head_len;
   char resp_buf[128];
   bus_head_t head;
   const char *topic_delim = ",";
// printf("run_pubsub_proxy server receive before\n");
   while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
//printf("run_pubsub_proxy server recv after: %s \n", buf);
      if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
 // printf("run_pubsub_proxy  %s %s \n", action, topics);
         if(strcmp(action, "sub") == 0) {
            // 订阅支持多主题订阅
            topic = strtok(topics, topic_delim);
      head = ShmModSocket::decode_bus_head(buf);
      topics = buf + BUS_HEAD_SIZE;
      action = head.action;
  // printf("run_pubsub_proxy : %s, %s \n", action, topics);
      if(strcmp(action, "sub") == 0) {
         // 订阅支持多主题订阅
         topic = strtok(topics, topic_delim);
//printf("run_pubsub_proxy topic = %s\n", topic);
        while(topic) {
       _proxy_sub(trim(topic, 0), key);
        topic =  strtok(NULL, topic_delim);
        }
      } else if(strcmp(action, "desub") == 0) {
// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
         if(strcmp(trim(topics, 0), "") == 0) {
            // 取消所有订阅
            _proxy_desub_all(key);
         } else {
            topic = strtok(topics, topic_delim);
           while(topic) {
          _proxy_sub(trim(topic, 0), key);
          _proxy_desub(trim(topic, 0), key);
           topic =  strtok(NULL, topic_delim);
           }
         } else if(strcmp(action, "desub") == 0) {
// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
            if(strcmp(trim(topics, 0), "") == 0) {
               // 取消所有订阅
               _proxy_desub_all(key);
            } else {
               topic = strtok(topics, topic_delim);
              while(topic) {
             _proxy_desub(trim(topic, 0), key);
              topic =  strtok(NULL, topic_delim);
              }
            }
         } else if(strcmp(action, "pub") == 0) {
            _proxy_pub(topics, head_len, buf, size, key);
         }  else if(strcmp(action, "stop") == 0) {
             logger->info( "Stopping Bus...");
             // snprintf(buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
             // shm_sendto(shm_socket, const void *buf, const int size, key);
             free(action);
             free(topics);
             free(buf);
             break;
         } else {
            logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action");
         }
         
         free(action);
         free(topics);
      } else if(strcmp(action, "pub") == 0) {
          content = topics + head.topic_size;
         _proxy_pub(topics, content, head.content_size, key);
      }  else if(strcmp(action, "stop") == 0) {
         logger->info( "Stopping Bus...");
          // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
         shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key);
         free(buf);
         break;
      } else {
         logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
         logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action %s", action);
      }
      // free(action);
      // free(topics);
      // } else {
      //    logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
      // }
      free(buf);
   }
   return NULL;
@@ -285,6 +301,7 @@
/**
 * deprecate
 * @str "<**sub**>{经济}"
 */