| | |
| | | |
| | | #include "bus_server_socket.h" |
| | | #include "shm_mod_socket.h" |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | |
| | | /* |
| | | * 处理发布,代理转发 |
| | | */ |
| | | void BusServerSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) { |
| | | void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) { |
| | | SHMKeySet *subscripter_set; |
| | | |
| | | SHMTopicSubMap::iterator map_iter; |
| | |
| | | for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { |
| | | send_key = *set_iter; |
| | | // printf("_proxy_pub send before %d \n", send_key); |
| | | if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) { |
| | | if (shm_sendto(shm_socket, buf, size, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) { |
| | | //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱 |
| | | subscripter_to_del.push_back(send_key); |
| | | } else { |
| | |
| | | // pthread_detach(pthread_self()); |
| | | int size; |
| | | int key; |
| | | char * action, *topic, *topics, *buf; |
| | | char * action, *topic, *topics, *buf, *content; |
| | | size_t head_len; |
| | | char resp_buf[128]; |
| | | bus_head_t head; |
| | | |
| | | const char *topic_delim = ","; |
| | | // printf("run_pubsub_proxy server receive before\n"); |
| | | while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { |
| | | //printf("run_pubsub_proxy server recv after: %s \n", buf); |
| | | if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) { |
| | | // printf("run_pubsub_proxy %s %s \n", action, topics); |
| | | if(strcmp(action, "sub") == 0) { |
| | | // 订阅支持多主题订阅 |
| | | topic = strtok(topics, topic_delim); |
| | | head = ShmModSocket::decode_bus_head(buf); |
| | | topics = buf + BUS_HEAD_SIZE; |
| | | action = head.action; |
| | | // if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) { |
| | | printf("run_pubsub_proxy : %s, %s \n", action, topics); |
| | | if(strcmp(action, "sub") == 0) { |
| | | // 订阅支持多主题订阅 |
| | | topic = strtok(topics, topic_delim); |
| | | //printf("run_pubsub_proxy topic = %s\n", topic); |
| | | while(topic) { |
| | | _proxy_sub(trim(topic, 0), key); |
| | | topic = strtok(NULL, topic_delim); |
| | | } |
| | | |
| | | } else if(strcmp(action, "desub") == 0) { |
| | | // printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); |
| | | if(strcmp(trim(topics, 0), "") == 0) { |
| | | // 取消所有订阅 |
| | | _proxy_desub_all(key); |
| | | } else { |
| | | |
| | | topic = strtok(topics, topic_delim); |
| | | while(topic) { |
| | | _proxy_sub(trim(topic, 0), key); |
| | | _proxy_desub(trim(topic, 0), key); |
| | | topic = strtok(NULL, topic_delim); |
| | | } |
| | | |
| | | } else if(strcmp(action, "desub") == 0) { |
| | | // printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); |
| | | if(strcmp(trim(topics, 0), "") == 0) { |
| | | // 取消所有订阅 |
| | | _proxy_desub_all(key); |
| | | } else { |
| | | |
| | | topic = strtok(topics, topic_delim); |
| | | while(topic) { |
| | | _proxy_desub(trim(topic, 0), key); |
| | | topic = strtok(NULL, topic_delim); |
| | | } |
| | | } |
| | | |
| | | } else if(strcmp(action, "pub") == 0) { |
| | | _proxy_pub(topics, head_len, buf, size, key); |
| | | } else if(strcmp(action, "stop") == 0) { |
| | | logger->info( "Stopping Bus..."); |
| | | // snprintf(buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER); |
| | | // shm_sendto(shm_socket, const void *buf, const int size, key); |
| | | free(action); |
| | | free(topics); |
| | | free(buf); |
| | | break; |
| | | } else { |
| | | logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action"); |
| | | } |
| | | |
| | | free(action); |
| | | free(topics); |
| | | } else if(strcmp(action, "pub") == 0) { |
| | | content = topics + head.topic_size; |
| | | _proxy_pub(topics, content, head.content_size, key); |
| | | } else if(strcmp(action, "stop") == 0) { |
| | | logger->info( "Stopping Bus..."); |
| | | // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER); |
| | | // shm_sendto(shm_socket, resp_buf, strlen(resp_buf), key); |
| | | // free(action); |
| | | // free(topics); |
| | | free(buf); |
| | | break; |
| | | } else { |
| | | logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg"); |
| | | logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action"); |
| | | } |
| | | |
| | | // free(action); |
| | | // free(topics); |
| | | // } else { |
| | | // logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg"); |
| | | // } |
| | | free(buf); |
| | | } |
| | | return NULL; |
| | |
| | | |
| | | private: |
| | | void _proxy_sub( char *topic, int key); |
| | | void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key); |
| | | void _proxy_pub( char *topic, void *buf, size_t size, int key); |
| | | void *run_pubsub_proxy(); |
| | | int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); |
| | | |
| | |
| | | /** |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::_sub_(char *topic, int size, int key, |
| | | int ShmModSocket::_sub_(char *topic, int topic_size, int key, |
| | | struct timespec *timeout, int flags) { |
| | | char buf[8192]; |
| | | int rv; |
| | | snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); |
| | | if(rv == 0) { |
| | | bus_set->insert(key); |
| | | // char buf[8192]; |
| | | // int rv; |
| | | // snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | // rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); |
| | | // if(rv == 0) { |
| | | // bus_set->insert(key); |
| | | // } |
| | | // return rv; |
| | | |
| | | int ret; |
| | | bus_head_t head = {}; |
| | | memcpy(head.action, "sub", sizeof(head.action)); |
| | | head.topic_size = topic_size = strlen(topic) + 1; |
| | | head.content_size = 0; |
| | | |
| | | void *buf; |
| | | int size = get_bus_sendbuf(head, topic, topic_size, NULL, 0, &buf); |
| | | if(size > 0) { |
| | | ret = shm_sendto(shm_socket, buf, size, key, timeout, flags); |
| | | free(buf); |
| | | if(ret == 0) { |
| | | bus_set->insert(key); |
| | | } |
| | | return ret; |
| | | } else { |
| | | return -1; |
| | | } |
| | | return rv; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::_desub_(char *topic, int size, int key, |
| | | int ShmModSocket::_desub_(char *topic, int topic_size, int key, |
| | | struct timespec *timeout, int flags) { |
| | | char buf[8192]; |
| | | // char buf[8192]; |
| | | int ret; |
| | | if(topic == NULL) { |
| | | topic = ""; |
| | | } |
| | | snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); |
| | | // snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | // return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); |
| | | |
| | | bus_head_t head = {}; |
| | | memcpy(head.action, "desub", sizeof(head.action)); |
| | | head.topic_size = topic_size = strlen(topic) + 1; |
| | | head.content_size = 0; |
| | | |
| | | void *buf; |
| | | int size = get_bus_sendbuf(head, topic, topic_size, NULL, 0, &buf); |
| | | if(size > 0) { |
| | | ret = shm_sendto(shm_socket, buf, size, key, timeout, flags); |
| | | free(buf); |
| | | return ret; |
| | | } else { |
| | | return -1; |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, |
| | | struct timespec *timeout, int flags) { |
| | | int head_len; |
| | | char buf[8192+content_size]; |
| | | snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | head_len = strlen(buf); |
| | | memcpy(buf+head_len, content, content_size); |
| | | return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags); |
| | | // int head_len; |
| | | // char buf[8192+content_size]; |
| | | // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | // head_len = strlen(buf); |
| | | // memcpy(buf+head_len, content, content_size); |
| | | |
| | | int ret; |
| | | bus_head_t head = {}; |
| | | memcpy(head.action, "pub", sizeof(head.action)); |
| | | head.topic_size = topic_size = strlen(topic) + 1; |
| | | head.content_size = content_size; |
| | | |
| | | void *buf; |
| | | int size = get_bus_sendbuf(head, topic, topic_size, content, content_size, &buf); |
| | | if(size > 0) { |
| | | ret = shm_sendto(shm_socket, buf, size, key, timeout, flags); |
| | | free(buf); |
| | | return ret; |
| | | } else { |
| | | return -1; |
| | | } |
| | | |
| | | |
| | | } |
| | | |
| | | |
| | | int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, |
| | | void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) { |
| | | |
| | | int buf_size; |
| | | char *buf; |
| | | int max_buf_size; |
| | | if((buf = (char *)malloc(MAXBUF)) == NULL) { |
| | | LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc"); |
| | | exit(1); |
| | | } else { |
| | | max_buf_size = MAXBUF; |
| | | } |
| | | |
| | | buf_size = BUS_HEAD_SIZE + content_size + topic_size ; |
| | | if(max_buf_size < buf_size) { |
| | | |
| | | if((buf = (char *)realloc(buf, buf_size)) == NULL) { |
| | | LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf realloc buf"); |
| | | exit(1); |
| | | } else { |
| | | max_buf_size = buf_size; |
| | | } |
| | | } |
| | | |
| | | memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE); |
| | | if(topic_size != 0 ) |
| | | memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size); |
| | | if(content_size != 0) |
| | | memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size); |
| | | |
| | | *retbuf = buf; |
| | | return buf_size; |
| | | } |
| | | |
| | | /** |
| | | char action[]; |
| | | uint32_t topic_size; |
| | | uint32_t content_size; |
| | | */ |
| | | |
| | | void * ShmModSocket::encode_bus_head(bus_head_t & head) { |
| | | void * headbs = malloc(BUS_HEAD_SIZE); |
| | | char *tmp_ptr = (char *)headbs; |
| | | |
| | | memcpy(tmp_ptr, head.action, sizeof(head.action)); |
| | | |
| | | tmp_ptr += 4; |
| | | PUT(tmp_ptr, htonl(head.topic_size)); |
| | | |
| | | tmp_ptr += 4; |
| | | PUT(tmp_ptr, htonl(head.content_size)); |
| | | |
| | | return headbs; |
| | | } |
| | | |
| | | bus_head_t ShmModSocket::decode_bus_head(void *headbs) { |
| | | char *tmp_ptr = (char *)headbs; |
| | | bus_head_t head; |
| | | |
| | | memcpy(head.action, tmp_ptr, sizeof(head.action)); |
| | | |
| | | tmp_ptr += 4; |
| | | head.topic_size = ntohl(GET(tmp_ptr)); |
| | | |
| | | tmp_ptr += 4; |
| | | head.content_size = ntohl(GET(tmp_ptr)); |
| | | |
| | | |
| | | return head; |
| | | } |
| | | |
| | | |
| | |
| | | #include <set> |
| | | #include "socket_def.h" |
| | | |
| | | #define BUS_HEAD_SIZE (64 + 2 * sizeof(uint32_t)) |
| | | |
| | | struct bus_head_t |
| | | { |
| | | char action[64]; |
| | | uint32_t topic_size; |
| | | uint32_t content_size; |
| | | }; |
| | | |
| | | |
| | | class ShmModSocket { |
| | | private: |
| | | shm_socket_t *shm_socket; |
| | |
| | | |
| | | int _desub_( char *topic, int size, int key, struct timespec *timeout, int flags); |
| | | |
| | | |
| | | static int get_bus_sendbuf(bus_head_t &request_head, void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf); |
| | | |
| | | public: |
| | | static size_t remove_keys(int keys[], size_t length); |
| | | |
| | | // bus header 编码为网络传输的字节 |
| | | static void * encode_bus_head(bus_head_t & bushead); |
| | | // 解码 bus header |
| | | static bus_head_t decode_bus_head(void *headbs); |
| | | |
| | | public: |
| | | ShmModSocket(); |
| | | ~ShmModSocket(); |