wangzhengquan
2020-12-21 ab9d762e22875cec0cecf7783b9d76995562bebb
src/socket/bus_server_socket.c
@@ -1,5 +1,6 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
static Logger *logger = LoggerFactory::getLogger();
@@ -183,7 +184,7 @@
/*
 * 处理发布,代理转发
*/
void BusServerSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) {
void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) {
   SHMKeySet *subscripter_set;
   SHMTopicSubMap::iterator map_iter;
@@ -200,7 +201,7 @@
      for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
         send_key = *set_iter;
 // printf("_proxy_pub send before %d \n", send_key);
         if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
         if (shm_sendto(shm_socket, buf, size, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
            //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱
            subscripter_to_del.push_back(send_key);
         } else {
@@ -226,57 +227,63 @@
   // pthread_detach(pthread_self());
   int size;
   int key;
   char * action, *topic, *topics, *buf;
   char * action, *topic, *topics, *buf, *content;
   size_t head_len;
   char resp_buf[128];
   bus_head_t head;
   const char *topic_delim = ",";
// printf("run_pubsub_proxy server receive before\n");
   while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
//printf("run_pubsub_proxy server recv after: %s \n", buf);
      if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
 // printf("run_pubsub_proxy  %s %s \n", action, topics);
         if(strcmp(action, "sub") == 0) {
            // 订阅支持多主题订阅
            topic = strtok(topics, topic_delim);
      head = ShmModSocket::decode_bus_head(buf);
      topics = buf + BUS_HEAD_SIZE;
      action = head.action;
      // if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
  printf("run_pubsub_proxy : %s, %s \n", action, topics);
      if(strcmp(action, "sub") == 0) {
         // 订阅支持多主题订阅
         topic = strtok(topics, topic_delim);
//printf("run_pubsub_proxy topic = %s\n", topic);
        while(topic) {
       _proxy_sub(trim(topic, 0), key);
        topic =  strtok(NULL, topic_delim);
        }
      } else if(strcmp(action, "desub") == 0) {
// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
         if(strcmp(trim(topics, 0), "") == 0) {
            // 取消所有订阅
            _proxy_desub_all(key);
         } else {
            topic = strtok(topics, topic_delim);
           while(topic) {
          _proxy_sub(trim(topic, 0), key);
          _proxy_desub(trim(topic, 0), key);
           topic =  strtok(NULL, topic_delim);
           }
         } else if(strcmp(action, "desub") == 0) {
// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
            if(strcmp(trim(topics, 0), "") == 0) {
               // 取消所有订阅
               _proxy_desub_all(key);
            } else {
               topic = strtok(topics, topic_delim);
              while(topic) {
             _proxy_desub(trim(topic, 0), key);
              topic =  strtok(NULL, topic_delim);
              }
            }
         } else if(strcmp(action, "pub") == 0) {
            _proxy_pub(topics, head_len, buf, size, key);
         }  else if(strcmp(action, "stop") == 0) {
             logger->info( "Stopping Bus...");
             // snprintf(buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
             // shm_sendto(shm_socket, const void *buf, const int size, key);
             free(action);
             free(topics);
             free(buf);
             break;
         } else {
            logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action");
         }
         
         free(action);
         free(topics);
      } else if(strcmp(action, "pub") == 0) {
          content = topics + head.topic_size;
         _proxy_pub(topics, content, head.content_size, key);
      }  else if(strcmp(action, "stop") == 0) {
          logger->info( "Stopping Bus...");
          // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
          // shm_sendto(shm_socket, resp_buf, strlen(resp_buf), key);
          // free(action);
          // free(topics);
          free(buf);
          break;
      } else {
         logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
         logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action");
      }
      // free(action);
      // free(topics);
      // } else {
      //    logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
      // }
      free(buf);
   }
   return NULL;