wangzhengquan
2020-12-21 ab9d762e22875cec0cecf7783b9d76995562bebb
update
4个文件已修改
264 ■■■■ 已修改文件
src/socket/bus_server_socket.c 85 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.c 158 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.c
@@ -1,5 +1,6 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
static Logger *logger = LoggerFactory::getLogger();
@@ -183,7 +184,7 @@
/*
 * 处理发布,代理转发
*/
void BusServerSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) {
void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
@@ -200,7 +201,7 @@
        for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
            send_key = *set_iter;
 // printf("_proxy_pub send before %d \n", send_key);
            if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
            if (shm_sendto(shm_socket, buf, size, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
                //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱
                subscripter_to_del.push_back(send_key);
            } else {
@@ -226,57 +227,63 @@
    // pthread_detach(pthread_self());
    int size;
    int key;
    char * action, *topic, *topics, *buf;
    char * action, *topic, *topics, *buf, *content;
    size_t head_len;
    char resp_buf[128];
    bus_head_t head;
    const char *topic_delim = ",";
// printf("run_pubsub_proxy server receive before\n");
    while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
//printf("run_pubsub_proxy server recv after: %s \n", buf);
        if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
 // printf("run_pubsub_proxy  %s %s \n", action, topics);
            if(strcmp(action, "sub") == 0) {
                // 订阅支持多主题订阅
                topic = strtok(topics, topic_delim);
        head = ShmModSocket::decode_bus_head(buf);
        topics = buf + BUS_HEAD_SIZE;
        action = head.action;
        // if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
  printf("run_pubsub_proxy : %s, %s \n", action, topics);
        if(strcmp(action, "sub") == 0) {
            // 订阅支持多主题订阅
            topic = strtok(topics, topic_delim);
//printf("run_pubsub_proxy topic = %s\n", topic);
          while(topic) {
       _proxy_sub(trim(topic, 0), key);
        topic =  strtok(NULL, topic_delim);
          }
        } else if(strcmp(action, "desub") == 0) {
// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
            if(strcmp(trim(topics, 0), "") == 0) {
                // 取消所有订阅
                _proxy_desub_all(key);
            } else {
                topic = strtok(topics, topic_delim);
              while(topic) {
           _proxy_sub(trim(topic, 0), key);
           _proxy_desub(trim(topic, 0), key);
            topic =  strtok(NULL, topic_delim);
              }
            } else if(strcmp(action, "desub") == 0) {
// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
                if(strcmp(trim(topics, 0), "") == 0) {
                    // 取消所有订阅
                    _proxy_desub_all(key);
                } else {
                    topic = strtok(topics, topic_delim);
                  while(topic) {
               _proxy_desub(trim(topic, 0), key);
                topic =  strtok(NULL, topic_delim);
                  }
                }
            } else if(strcmp(action, "pub") == 0) {
                _proxy_pub(topics, head_len, buf, size, key);
            }  else if(strcmp(action, "stop") == 0) {
                 logger->info( "Stopping Bus...");
                 // snprintf(buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
                 // shm_sendto(shm_socket, const void *buf, const int size, key);
                 free(action);
                 free(topics);
                 free(buf);
                 break;
            } else {
                logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action");
            }
            
            free(action);
            free(topics);
        } else if(strcmp(action, "pub") == 0) {
             content = topics + head.topic_size;
            _proxy_pub(topics, content, head.content_size, key);
        }  else if(strcmp(action, "stop") == 0) {
             logger->info( "Stopping Bus...");
             // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
             // shm_sendto(shm_socket, resp_buf, strlen(resp_buf), key);
             // free(action);
             // free(topics);
             free(buf);
             break;
        } else {
            logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
            logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action");
        }
        // free(action);
        // free(topics);
        // } else {
        //     logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
        // }
        free(buf);
    }
    return NULL;
src/socket/bus_server_socket.h
@@ -28,7 +28,7 @@
private:
    void _proxy_sub( char *topic, int key);
    void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key);
    void _proxy_pub( char *topic, void *buf, size_t size, int key);
    void *run_pubsub_proxy();
    int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
 
src/socket/shm_mod_socket.c
@@ -187,30 +187,66 @@
/**
 * @key 总线端口
 */
int  ShmModSocket::_sub_(char *topic, int size, int key,
int  ShmModSocket::_sub_(char *topic, int topic_size, int key,
    struct timespec *timeout, int flags) {
    char buf[8192];
    int rv;
    snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
    rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
    if(rv == 0) {
        bus_set->insert(key);
    // char buf[8192];
    // int rv;
    // snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
    // rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
    // if(rv == 0) {
    //     bus_set->insert(key);
    // }
    // return rv;
    int ret;
    bus_head_t head = {};
    memcpy(head.action, "sub", sizeof(head.action));
    head.topic_size = topic_size = strlen(topic) + 1;
    head.content_size = 0;
    void *buf;
    int size = get_bus_sendbuf(head, topic, topic_size, NULL,  0, &buf);
    if(size > 0) {
        ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
        free(buf);
        if(ret == 0) {
            bus_set->insert(key);
        }
        return ret;
    } else {
        return -1;
    }
    return rv;
}
/**
 * @key 总线端口
 */
int  ShmModSocket::_desub_(char *topic, int size, int key,
int  ShmModSocket::_desub_(char *topic, int topic_size, int key,
    struct timespec *timeout, int flags) {
    char buf[8192];
    // char buf[8192];
    int ret;
    if(topic == NULL) {
        topic = "";
    }
    snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER,  topic, TOPIC_RIDENTIFIER);
    return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
    // snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER,  topic, TOPIC_RIDENTIFIER);
    // return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
    bus_head_t head = {};
    memcpy(head.action, "desub", sizeof(head.action));
    head.topic_size = topic_size = strlen(topic) + 1;
    head.content_size = 0;
    void *buf;
    int size = get_bus_sendbuf(head, topic,  topic_size, NULL,  0, &buf);
    if(size > 0) {
        ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
        free(buf);
        return ret;
    } else {
        return -1;
    }
}
/**
@@ -220,15 +256,101 @@
 
int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key,  
    struct timespec *timeout, int flags) {
    int head_len;
    char buf[8192+content_size];
    snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
    head_len = strlen(buf);
    memcpy(buf+head_len, content, content_size);
    return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags);
    // int head_len;
    // char buf[8192+content_size];
    // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
    // head_len = strlen(buf);
    // memcpy(buf+head_len, content, content_size);
    int ret;
    bus_head_t head = {};
    memcpy(head.action, "pub", sizeof(head.action));
    head.topic_size = topic_size = strlen(topic) + 1;
    head.content_size = content_size;
    void *buf;
    int size = get_bus_sendbuf(head, topic,  topic_size, content,  content_size, &buf);
    if(size > 0) {
        ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
        free(buf);
        return ret;
    } else {
        return -1;
    }
}
int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head,
  void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) {
  int buf_size;
  char *buf;
  int  max_buf_size;
  if((buf = (char *)malloc(MAXBUF)) == NULL) {
    LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
    exit(1);
  } else {
    max_buf_size = MAXBUF;
  }
  buf_size = BUS_HEAD_SIZE + content_size + topic_size  ;
  if(max_buf_size < buf_size) {
    if((buf = (char *)realloc(buf, buf_size)) == NULL) {
      LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf  realloc buf");
      exit(1);
    } else {
      max_buf_size = buf_size;
    }
  }
  memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE);
  if(topic_size != 0 )
    memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
  if(content_size != 0)
      memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
  *retbuf = buf;
  return buf_size;
}
/**
    char action[];
  uint32_t topic_size;
    uint32_t content_size;
*/
void * ShmModSocket::encode_bus_head(bus_head_t & head) {
  void * headbs = malloc(BUS_HEAD_SIZE);
  char *tmp_ptr = (char *)headbs;
  memcpy(tmp_ptr, head.action, sizeof(head.action));
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(head.topic_size));
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(head.content_size));
  return headbs;
}
bus_head_t  ShmModSocket::decode_bus_head(void *headbs) {
  char *tmp_ptr = (char *)headbs;
  bus_head_t head;
  memcpy(head.action, tmp_ptr, sizeof(head.action));
  tmp_ptr += 4;
  head.topic_size = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.content_size = ntohl(GET(tmp_ptr));
  return head;
}
 
src/socket/shm_mod_socket.h
@@ -11,6 +11,16 @@
#include <set>
#include "socket_def.h"
#define BUS_HEAD_SIZE (64 + 2 * sizeof(uint32_t))
struct bus_head_t
{
    char action[64];
    uint32_t topic_size;
    uint32_t content_size;
};
class ShmModSocket {
private:
    shm_socket_t *shm_socket;
@@ -26,8 +36,17 @@
    int  _desub_( char *topic, int size, int key, struct timespec *timeout, int flags);
    static int get_bus_sendbuf(bus_head_t &request_head, void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf);
public:
    static size_t remove_keys(int keys[], size_t length);
  // bus header 编码为网络传输的字节
  static void * encode_bus_head(bus_head_t & bushead);
  // 解码 bus  header
  static bus_head_t  decode_bus_head(void *headbs);
public:
    ShmModSocket();
    ~ShmModSocket();