| | |
| | | #include "shm_mod_socket.h" |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | |
| | | void ShmModSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { |
| | | SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); |
| | |
| | | // printf("ShmModSocket destory 4\n"); |
| | | } |
| | | |
| | | int ShmModSocket::bind(int port) { |
| | | return shm_socket_bind(shm_socket, port); |
| | | int ShmModSocket::bind(int key) { |
| | | return shm_socket_bind(shm_socket, key); |
| | | } |
| | | |
| | | |
| | |
| | | * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::force_bind(int port) { |
| | | return shm_socket_force_bind(shm_socket, port); |
| | | int ShmModSocket::force_bind(int key) { |
| | | return shm_socket_force_bind(shm_socket, key); |
| | | } |
| | | /** |
| | | * 发送信息 |
| | | * @port 发送给谁 |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::sendto(const void *buf, const int size, const int port) { |
| | | return shm_sendto(shm_socket, buf, size, port, NULL, 0); |
| | | int ShmModSocket::sendto(const void *buf, const int size, const int key) { |
| | | return shm_sendto(shm_socket, buf, size, key, NULL, 0); |
| | | } |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sendto_timeout(const void *buf, const int size, const int port, const struct timespec *timeout) { |
| | | return shm_sendto(shm_socket, buf, size, port, timeout, 0); |
| | | int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) { |
| | | return shm_sendto(shm_socket, buf, size, key, timeout, 0); |
| | | } |
| | | // 发送信息立刻返回。 |
| | | int ShmModSocket::sendto_nowait( const void *buf, const int size, const int port){ |
| | | return shm_sendto(shm_socket, buf, size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){ |
| | | return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | | inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *port, struct timespec *timeout, int flags) { |
| | | inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *key, struct timespec *timeout, int flags) { |
| | | |
| | | if(mod == BUS) { |
| | | err_exit(0, "Can not use method recvfrom in a Bus"); |
| | | logger->error("Can not use method recvfrom in a Bus"); |
| | | exit(1); |
| | | } |
| | | // printf("dgram_mod_recvfrom before\n"); |
| | | int rv = shm_recvfrom(shm_socket, buf, size, port, timeout, flags); |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags); |
| | | // printf("dgram_mod_recvfrom after\n"); |
| | | return rv; |
| | | } |
| | | /** |
| | | * 接收信息 |
| | | * @port 从谁哪里收到的信息 |
| | | * @key 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::recvfrom(void **buf, int *size, int *port) { |
| | | int ShmModSocket::recvfrom(void **buf, int *size, int *key) { |
| | | |
| | | return _recvfrom_( buf, size, port, NULL, 0); |
| | | return _recvfrom_( buf, size, key, NULL, 0); |
| | | } |
| | | |
| | | |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *port, struct timespec *timeout) { |
| | | return _recvfrom_(buf, size, port, timeout, 0); |
| | | int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) { |
| | | return _recvfrom_(buf, size, key, timeout, 0); |
| | | } |
| | | |
| | | int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *port){ |
| | | return _recvfrom_(buf, size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){ |
| | | return _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | /** |
| | | * 发送请求信息并等待接收应答 |
| | | * @port 发送给谁 |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0); |
| | | int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0); |
| | | int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); |
| | | } |
| | | int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); |
| | | } |
| | | int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | |
| | | * 订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @port 总线端口 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::sub(char *topic, int size, int port){ |
| | | return _sub_( topic, size, port, NULL, 0); |
| | | int ShmModSocket::sub(char *topic, int size, int key){ |
| | | return _sub_( topic, size, key, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sub_timeout(char *topic, int size, int port, struct timespec *timeout){ |
| | | return _sub_(topic, size, port, timeout, 0); |
| | | int ShmModSocket::sub_timeout(char *topic, int size, int key, struct timespec *timeout){ |
| | | return _sub_(topic, size, key, timeout, 0); |
| | | } |
| | | int ShmModSocket::sub_nowait(char *topic, int size, int port) { |
| | | return _sub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::sub_nowait(char *topic, int size, int key) { |
| | | return _sub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | |
| | | * 取消订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @port 总线端口 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::desub(char *topic, int size, int port){ |
| | | return _desub_( topic, size, port, NULL, 0); |
| | | int ShmModSocket::desub(char *topic, int size, int key){ |
| | | return _desub_( topic, size, key, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::desub_timeout(char *topic, int size, int port, struct timespec *timeout){ |
| | | return _desub_(topic, size, port, timeout, 0); |
| | | int ShmModSocket::desub_timeout(char *topic, int size, int key, struct timespec *timeout){ |
| | | return _desub_(topic, size, key, timeout, 0); |
| | | } |
| | | int ShmModSocket::desub_nowait(char *topic, int size, int port) { |
| | | return _desub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::desub_nowait(char *topic, int size, int key) { |
| | | return _desub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | |
| | | * 发布主题 |
| | | * @topic 主题 |
| | | * @content 主题内容 |
| | | * @port 总线端口 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int port){ |
| | | return _pub_(topic, topic_size, content, content_size, port, NULL, 0); |
| | | int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){ |
| | | return _pub_(topic, topic_size, content, content_size, key, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){ |
| | | return _pub_( topic, topic_size, content, content_size, port, timeout, 0); |
| | | int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, struct timespec * timeout){ |
| | | return _pub_( topic, topic_size, content, content_size, key, timeout, 0); |
| | | } |
| | | int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int port){ |
| | | return _pub_(topic, topic_size, content, content_size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){ |
| | | return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 获取soket端口号 |
| | | * 获取soket key |
| | | */ |
| | | int ShmModSocket::get_port(){ |
| | | return shm_socket->port; |
| | | int ShmModSocket::get_key(){ |
| | | return shm_socket->key; |
| | | } |
| | | |
| | | |
| | | |
| | | // ============================================================================= |
| | | /** |
| | | * @port 总线端口 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::_sub_(char *topic, int size, int port, |
| | | int ShmModSocket::_sub_(char *topic, int size, int key, |
| | | struct timespec *timeout, int flags) { |
| | | char buf[8192]; |
| | | int rv; |
| | | snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); |
| | | rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); |
| | | if(rv == 0) { |
| | | bus_set->insert(port); |
| | | bus_set->insert(key); |
| | | } |
| | | return rv; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * @port 总线端口 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::_desub_(char *topic, int size, int port, |
| | | int ShmModSocket::_desub_(char *topic, int size, int key, |
| | | struct timespec *timeout, int flags) { |
| | | char buf[8192]; |
| | | if(topic == NULL) { |
| | | topic = ""; |
| | | } |
| | | snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); |
| | | return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); |
| | | } |
| | | |
| | | /** |
| | | * @port 总线端口 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int port, |
| | | int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, |
| | | struct timespec *timeout, int flags) { |
| | | int head_len; |
| | | char buf[8192+content_size]; |
| | | snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | head_len = strlen(buf); |
| | | memcpy(buf+head_len, content, content_size); |
| | | return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags); |
| | | return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags); |
| | | |
| | | } |
| | | /* |
| | | * 处理订阅 |
| | | */ |
| | | void ShmModSocket::_proxy_sub( char *topic, int port) { |
| | | void ShmModSocket::_proxy_sub( char *topic, int key) { |
| | | SHMKeySet *subscripter_set; |
| | | |
| | | SHMTopicSubMap::iterator map_iter; |
| | |
| | | subscripter_set = new(set_ptr) SHMKeySet; |
| | | topic_sub_map->insert({topic, subscripter_set}); |
| | | } |
| | | subscripter_set->insert(port); |
| | | subscripter_set->insert(key); |
| | | } |
| | | |
| | | /* |
| | | * 处理取消订阅 |
| | | */ |
| | | void ShmModSocket::_proxy_desub( char *topic, int port) { |
| | | void ShmModSocket::_proxy_desub( char *topic, int key) { |
| | | SHMKeySet *subscripter_set; |
| | | |
| | | SHMTopicSubMap::iterator map_iter; |
| | |
| | | if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { |
| | | subscripter_set = map_iter->second; |
| | | |
| | | subscripter_set->erase(port); |
| | | printf("============ desub %d\n", port); |
| | | subscripter_set->erase(key); |
| | | } |
| | | } |
| | | |
| | | /* |
| | | * 处理取消所有订阅 |
| | | */ |
| | | void ShmModSocket::_proxy_desub_all(int port) { |
| | | void ShmModSocket::_proxy_desub_all(int key) { |
| | | SHMKeySet *subscripter_set; |
| | | |
| | | SHMTopicSubMap::iterator map_iter; |
| | | // SHMKeySet::iterator set_iter; |
| | | for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | subscripter_set = map_iter->second; |
| | | subscripter_set->erase(port); |
| | | printf("============ desub %d\n", port); |
| | | subscripter_set->erase(key); |
| | | } |
| | | } |
| | | |
| | | /* |
| | | * 处理发布,代理转发 |
| | | */ |
| | | void ShmModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int port) { |
| | | void ShmModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) { |
| | | SHMKeySet *subscripter_set; |
| | | |
| | | SHMTopicSubMap::iterator map_iter; |
| | |
| | | std::vector<int> subscripter_to_del; |
| | | std::vector<int>::iterator vector_iter; |
| | | |
| | | int send_port; |
| | | int send_key; |
| | | struct timespec timeout = {1,0}; |
| | | |
| | | if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { |
| | | subscripter_set = map_iter->second; |
| | | for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { |
| | | send_port = *set_iter; |
| | | // printf("_proxy_pub send before %d \n", send_port); |
| | | if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_port, &timeout) == SHM_SOCKET_ECONNFAILED ) { |
| | | send_key = *set_iter; |
| | | // printf("_proxy_pub send before %d \n", send_key); |
| | | if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) { |
| | | //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱 |
| | | subscripter_to_del.push_back(send_port); |
| | | subscripter_to_del.push_back(send_key); |
| | | } else { |
| | | // printf("_proxy_pub send after: %d \n", send_port); |
| | | // printf("_proxy_pub send after: %d \n", send_key); |
| | | } |
| | | |
| | | |
| | |
| | | for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) { |
| | | if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) { |
| | | subscripter_set->erase(set_iter); |
| | | printf("remove closed subscripter %d \n", send_port); |
| | | logger->debug("remove closed subscripter %d \n", send_key); |
| | | } |
| | | } |
| | | subscripter_to_del.clear(); |
| | |
| | | void * ShmModSocket::run_pubsub_proxy() { |
| | | // pthread_detach(pthread_self()); |
| | | int size; |
| | | int port; |
| | | int key; |
| | | char * action, *topic, *topics, *buf; |
| | | size_t head_len; |
| | | |
| | | const char *topic_delim = ","; |
| | | // printf("run_pubsub_proxy server receive before\n"); |
| | | while(shm_recvfrom(shm_socket, (void **)&buf, &size, &port) == 0) { |
| | | while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { |
| | | //printf("run_pubsub_proxy server recv after: %s \n", buf); |
| | | if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) { |
| | | // printf("run_pubsub_proxy %s %s \n", action, topics); |
| | |
| | | topic = strtok(topics, topic_delim); |
| | | //printf("run_pubsub_proxy topic = %s\n", topic); |
| | | while(topic) { |
| | | _proxy_sub(trim(topic, 0), port); |
| | | _proxy_sub(trim(topic, 0), key); |
| | | topic = strtok(NULL, topic_delim); |
| | | } |
| | | |
| | |
| | | // printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); |
| | | if(strcmp(trim(topics, 0), "") == 0) { |
| | | // 取消所有订阅 |
| | | printf("====取消所有订阅\n"); |
| | | _proxy_desub_all(port); |
| | | _proxy_desub_all(key); |
| | | } else { |
| | | |
| | | topic = strtok(topics, topic_delim); |
| | | while(topic) { |
| | | _proxy_desub(trim(topic, 0), port); |
| | | _proxy_desub(trim(topic, 0), key); |
| | | topic = strtok(NULL, topic_delim); |
| | | } |
| | | } |
| | |
| | | |
| | | |
| | | } else if(strcmp(action, "pub") == 0) { |
| | | _proxy_pub(topics, head_len, buf, size, port); |
| | | _proxy_pub(topics, head_len, buf, size, key); |
| | | } |
| | | |
| | | free(action); |
| | | free(topics); |
| | | } else { |
| | | err_msg(0, "incorrect format msg"); |
| | | logger->error( "ShmModSocket::run_pubsub_proxy : incorrect format msg"); |
| | | } |
| | | free(buf); |
| | | } |