wangzhengquan
2020-12-22 fb8aef5a4908a50d415cf5ed33a10699fdfa9c98
src/socket/shm_mod_socket.c
@@ -1,63 +1,10 @@
#include "shm_mod_socket.h"
void ShmModSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
   SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
   SHMKeySet *subscripter_set;
   SHMKeySet::iterator set_iter;
   SHMTopicSubMap::iterator map_iter;
   if(topic_sub_map != NULL) {
      for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
         subscripter_set = map_iter->second;
         if(subscripter_set != NULL) {
            for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
               cb(subscripter_set, *set_iter);
            }
         }
      }
   }
}
bool ShmModSocket::include_in_keys(int key, int keys[], size_t length) {
   if(length == 0) {
      return false;
   }
   for(int i = 0; i < length; i++) {
      if(keys[i] == key)
         return true;
   }
   return false;
}
size_t ShmModSocket::remove_subscripters(int keys[], size_t length) {
   size_t count = 0;
   int key;
   for(int i = 0; i < length; i++) {
      key = keys[i];
      SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
      SHMKeySet *subscripter_set;
      SHMKeySet::iterator set_iter;
      SHMTopicSubMap::iterator map_iter;
      if(topic_sub_map != NULL) {
         for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
            if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) {
               subscripter_set->erase(set_iter);
// printf("remove_subscripter %s, %d\n", map_iter->first, key);
               count++;
            }
         }
      }
   }
   return count;
}
#include "bus_server_socket.h"
static Logger *logger = LoggerFactory::getLogger();
size_t ShmModSocket::remove_keys(int keys[], size_t length) {
   remove_subscripters(keys, length);
   BusServerSocket::remove_subscripters(keys, length);
   return shm_socket_remove_keys(keys, length);
}
@@ -65,42 +12,18 @@
   mod = (socket_mod_t)0;
   shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
   bus_set = new std::set<int>;
   topic_sub_map = NULL;
}
ShmModSocket::~ShmModSocket() {
// printf("ShmModSocket  destory 1\n");
   SHMKeySet *subscripter_set;
   SHMTopicSubMap::iterator map_iter;
  logger->debug("Destory ShmModSocket...\n");
   struct timespec timeout = {1, 0};
   if(bus_set != NULL) {
      for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
// printf("ShmModSocket  desub_timeout before");
         desub_timeout(NULL, 0, *bus_iter, &timeout);
// printf("ShmModSocket  desub_timeout after %d\n", *bus_iter);
      }
      delete bus_set;
   }
// printf("ShmModSocket  destory 2\n");
   if(topic_sub_map != NULL) {
      for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
         subscripter_set = map_iter->second;
// printf("ShmModSocket  destory 2-1\n");
         if(subscripter_set != NULL) {
// printf("ShmModSocket  destory 2-2\n");
            subscripter_set->clear();
// printf("ShmModSocket  destory 2-3\n");
            mm_free((void *)subscripter_set);
// printf("ShmModSocket  destory 2-4\n");
         }
      }
      topic_sub_map->clear();
      mem_pool_free_by_key(BUS_MAP_KEY);
   }
// printf("ShmModSocket  destory 3\n");
   // printf("=============close socket\n");
   shm_close_socket(shm_socket);
// printf("ShmModSocket  destory 4\n");   
}
@@ -108,8 +31,6 @@
int ShmModSocket::bind(int key) {
   return  shm_socket_bind(shm_socket, key);
/**
 * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
@@ -139,7 +60,8 @@
inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *key,  struct timespec *timeout, int flags) {
   if(mod == BUS) {
      err_exit(0, "Can not use method recvfrom in a Bus");
      logger->error("Can not use method recvfrom in a Bus");
      exit(1);
   }
// printf("dgram_mod_recvfrom  before\n");
   int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags);
@@ -195,20 +117,6 @@
/**
 * 启动bus
 *
 * @return 0 成功, 其他值 失败的错误码
*/
int  ShmModSocket::start_bus(){
   mod = BUS;
   topic_sub_map =   mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
   run_pubsub_proxy();
   // pthread_t tid;
   // pthread_create(&tid, NULL, run_accept_sub_request, _socket);
   return 0;
}
/**
 * 订阅指定主题
@@ -279,258 +187,168 @@
/**
 * @key 总线端口
 */
int  ShmModSocket::_sub_(char *topic, int size, int key,
int  ShmModSocket::_sub_(char *topic, int topic_size, int key,
   struct timespec *timeout, int flags) {
   char buf[8192];
   int rv;
   snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
   rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
   if(rv == 0) {
      bus_set->insert(key);
   // char buf[8192];
   // int rv;
   // snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
   // rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
   // if(rv == 0) {
   //    bus_set->insert(key);
   // }
   // return rv;
   int ret;
   bus_head_t head = {};
   memcpy(head.action, "sub", sizeof(head.action));
   head.topic_size = topic_size = strlen(topic) + 1;
   head.content_size = 0;
   void *buf;
   int size = get_bus_sendbuf(head, topic, topic_size, NULL,  0, &buf);
   if(size > 0) {
      ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
      free(buf);
      if(ret == 0) {
         bus_set->insert(key);
      }
      return ret;
   } else {
      return -1;
   }
   return rv;
}
/**
 * @key 总线端口
 */
int  ShmModSocket::_desub_(char *topic, int size, int key,
int  ShmModSocket::_desub_(char *topic, int topic_size, int key,
   struct timespec *timeout, int flags) {
   char buf[8192];
   // char buf[8192];
   int ret;
   if(topic == NULL) {
      topic = "";
   }
   snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER,  topic, TOPIC_RIDENTIFIER);
   return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
   // snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER,  topic, TOPIC_RIDENTIFIER);
   // return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
   bus_head_t head = {};
   memcpy(head.action, "desub", sizeof(head.action));
   head.topic_size = topic_size = strlen(topic) + 1;
   head.content_size = 0;
   void *buf;
   int size = get_bus_sendbuf(head, topic,  topic_size, NULL,  0, &buf);
   if(size > 0) {
      ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
      free(buf);
      return ret;
   } else {
      return -1;
   }
}
/**
 * @key 总线端口
 * @str "<**pub**>{经济}"
 */
int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key,  
   struct timespec *timeout, int flags) {
   int head_len;
   char buf[8192+content_size];
   snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
   head_len = strlen(buf);
   memcpy(buf+head_len, content, content_size);
   return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags);
   // int head_len;
   // char buf[8192+content_size];
   // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
   // head_len = strlen(buf);
   // memcpy(buf+head_len, content, content_size);
}
/*
 * 处理订阅
*/
void ShmModSocket::_proxy_sub( char *topic, int key) {
   SHMKeySet *subscripter_set;
   int ret;
   bus_head_t head = {};
   memcpy(head.action, "pub", sizeof(head.action));
   head.topic_size = topic_size = strlen(topic) + 1;
   head.content_size = content_size;
   SHMTopicSubMap::iterator map_iter;
   SHMKeySet::iterator set_iter;
//printf("_proxy_sub topic = %s\n", topic);
   if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
      subscripter_set = map_iter->second;
   } else {
      void *set_ptr = mm_malloc(sizeof(SHMKeySet));
      subscripter_set = new(set_ptr) SHMKeySet;
      topic_sub_map->insert({topic, subscripter_set});
   }
   subscripter_set->insert(key);
}
/*
 * 处理取消订阅
*/
void ShmModSocket::_proxy_desub( char *topic, int key) {
   SHMKeySet *subscripter_set;
   SHMTopicSubMap::iterator map_iter;
   // SHMKeySet::iterator set_iter;
   if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
      subscripter_set = map_iter->second;
      subscripter_set->erase(key);
printf("============ desub %d\n", key);
   }
}
/*
 * 处理取消所有订阅
*/
void ShmModSocket::_proxy_desub_all(int key) {
   SHMKeySet *subscripter_set;
   SHMTopicSubMap::iterator map_iter;
   // SHMKeySet::iterator set_iter;
   for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
         subscripter_set = map_iter->second;
         subscripter_set->erase(key);
printf("============ desub %d\n", key);
   }
}
/*
 * 处理发布,代理转发
*/
void ShmModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) {
   SHMKeySet *subscripter_set;
   SHMTopicSubMap::iterator map_iter;
   SHMKeySet::iterator set_iter;
   std::vector<int> subscripter_to_del;
   std::vector<int>::iterator vector_iter;
   int send_key;
   struct timespec timeout = {1,0};
   if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
      subscripter_set = map_iter->second;
      for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
         send_key = *set_iter;
 // printf("_proxy_pub send before %d \n", send_key);
         if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
            //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱
            subscripter_to_del.push_back(send_key);
         } else {
// printf("_proxy_pub send after: %d \n", send_key);
         }
      }
      // 删除已关闭的端
      for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
         if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
            subscripter_set->erase(set_iter);
            printf("remove closed subscripter %d \n", send_key);
         }
      }
      subscripter_to_del.clear();
   }
}
void * ShmModSocket::run_pubsub_proxy() {
   // pthread_detach(pthread_self());
   int size;
   int key;
   char * action, *topic, *topics, *buf;
   size_t head_len;
   const char *topic_delim = ",";
// printf("run_pubsub_proxy server receive before\n");
   while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
//printf("run_pubsub_proxy server recv after: %s \n", buf);
      if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
// printf("run_pubsub_proxy  %s %s \n", action, topics);
         if(strcmp(action, "sub") == 0) {
            // 订阅支持多主题订阅
            topic = strtok(topics, topic_delim);
//printf("run_pubsub_proxy topic = %s\n", topic);
           while(topic) {
          _proxy_sub(trim(topic, 0), key);
           topic =  strtok(NULL, topic_delim);
           }
         } else if(strcmp(action, "desub") == 0) {
// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
            if(strcmp(trim(topics, 0), "") == 0) {
               // 取消所有订阅
      printf("====取消所有订阅\n");
               _proxy_desub_all(key);
            } else {
               topic = strtok(topics, topic_delim);
              while(topic) {
             _proxy_desub(trim(topic, 0), key);
              topic =  strtok(NULL, topic_delim);
              }
            }
         } else if(strcmp(action, "pub") == 0) {
            _proxy_pub(topics, head_len, buf, size, key);
         }
         free(action);
         free(topics);
      } else {
         err_msg(0, "incorrect format msg");
      }
   void *buf;
   int size = get_bus_sendbuf(head, topic,  topic_size, content,  content_size, &buf);
   if(size > 0) {
      ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
      free(buf);
      return ret;
   } else {
      return -1;
   }
   return NULL;
}
int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head,
  void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) {
  int buf_size;
  char *buf;
  int  max_buf_size;
  if((buf = (char *)malloc(MAXBUF)) == NULL) {
    LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
    exit(1);
  } else {
    max_buf_size = MAXBUF;
  }
  buf_size = BUS_HEAD_SIZE + content_size + topic_size  ;
  if(max_buf_size < buf_size) {
    if((buf = (char *)realloc(buf, buf_size)) == NULL) {
      LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf  realloc buf");
      exit(1);
    } else {
      max_buf_size = buf_size;
    }
  }
  memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE);
  if(topic_size != 0 )
    memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
  if(content_size != 0)
     memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
  *retbuf = buf;
  return buf_size;
}
/**
 * @str "<**sub**>{经济}"
 */
   char action[];
  uint32_t topic_size;
   uint32_t content_size;
*/
int ShmModSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) {
 char *ptr = str;
 char *str_end_ptr = str + size;
 char *action_start_ptr;
 char *action_end_ptr;
 size_t action_len = 0;
void * ShmModSocket::encode_bus_head(bus_head_t & head) {
  void * headbs = malloc(BUS_HEAD_SIZE);
  char *tmp_ptr = (char *)headbs;
 char *topic_start_ptr;
 char *topic_end_ptr;
 size_t topic_len = 0;
  memcpy(tmp_ptr, head.action, sizeof(head.action));
 // if (strlen(identifier) > strlen(str)) {
 //  return 0;
 // }
  tmp_ptr += sizeof(head.action);
  PUT(tmp_ptr, htonl(head.topic_size));
 if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) {
  ptr += strlen(ACTION_LIDENTIFIER);
  action_start_ptr = ptr;
  while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) {
    if(ptr >= str_end_ptr) {
      return 0;
    }
  }
// printf("%s\n", ptr);
  action_end_ptr = ptr;
  action_len = action_end_ptr - action_start_ptr;
  ptr += strlen(ACTION_RIDENTIFIER);
// printf("%s\n", ptr);
// printf("%s\n", str_end_ptr-1);
  if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) {
    topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER);
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(head.content_size));
  return headbs;
}
    while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) {
      if(ptr >= str_end_ptr) {
        return 0;
      }
    }
    topic_end_ptr = ptr;
    topic_len = topic_end_ptr - topic_start_ptr;
    ptr += strlen(TOPIC_RIDENTIFIER);
  } else {
    return 0;
  }
 } else {
  return 0;
 }
bus_head_t  ShmModSocket::decode_bus_head(void *headbs) {
  char *tmp_ptr = (char *)headbs;
  bus_head_t head;
 char *topic = (char *)malloc(topic_len+1);
 strncpy(topic, topic_start_ptr, topic_len);
 *(topic+topic_len) = '\0';
 *_topic = topic;
  memcpy(head.action, tmp_ptr, sizeof(head.action));
 char *action = (char *)malloc(action_len+1);
 strncpy(action, action_start_ptr, action_len);
 *(action+action_len) = '\0';
 *_action = action;
 *head_len = ptr-str;
  tmp_ptr += sizeof(head.action);
  head.topic_size = ntohl(GET(tmp_ptr));
 return 1;
  tmp_ptr += 4;
  head.content_size = ntohl(GET(tmp_ptr));
  return head;
}