| | |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int net_mod_socket_force_bind(void * _socket, int port); |
| | | |
| | | /** |
| | | * 发送信息 |
| | | * @port 发送给谁 |
| | |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ; |
| | | |
| | | |
| | | /** |
| | | * 启动bus |
| | | * |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int net_mod_socket_start_bus(void * _socket); |
| | | |
| | | |
| | | /** |
| | | * 向node_arr 中的所有网络节点发布消息 |
| | |
| | | int net_mod_socket_pub(void *_sockt, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size); |
| | | |
| | | |
| | | /** |
| | | * 启动bus |
| | | * |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int net_mod_socket_start_bus(void * _socket); |
| | | |
| | | /** |
| | | * 订阅指定主题 |
| | |
| | | int net_mod_socket_desub_nowait(void * _socket, void *topic, int size, int port); |
| | | |
| | | |
| | | |
| | | /** |
| | | * 获取soket端口号 |
| | | */ |
| | | int net_mod_socket_get_key(void * _socket) ; |
| | | |
| | | |
| | | |
| | | /** |
| | |
| | | #include "shm_mod_socket.h" |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | |
| | | void ShmModSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { |
| | | SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); |
| | |
| | | inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *key, struct timespec *timeout, int flags) { |
| | | |
| | | if(mod == BUS) { |
| | | err_exit(0, "Can not use method recvfrom in a Bus"); |
| | | logger->error("Can not use method recvfrom in a Bus"); |
| | | exit(1); |
| | | } |
| | | // printf("dgram_mod_recvfrom before\n"); |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags); |
| | |
| | | subscripter_set = map_iter->second; |
| | | |
| | | subscripter_set->erase(key); |
| | | printf("============ desub %d\n", key); |
| | | } |
| | | } |
| | | |
| | |
| | | for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | subscripter_set = map_iter->second; |
| | | subscripter_set->erase(key); |
| | | printf("============ desub %d\n", key); |
| | | } |
| | | } |
| | | |
| | |
| | | for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) { |
| | | if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) { |
| | | subscripter_set->erase(set_iter); |
| | | printf("remove closed subscripter %d \n", send_key); |
| | | logger->debug("remove closed subscripter %d \n", send_key); |
| | | } |
| | | } |
| | | subscripter_to_del.clear(); |
| | |
| | | // printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); |
| | | if(strcmp(trim(topics, 0), "") == 0) { |
| | | // 取消所有订阅 |
| | | printf("====取消所有订阅\n"); |
| | | _proxy_desub_all(key); |
| | | } else { |
| | | |
| | |
| | | free(action); |
| | | free(topics); |
| | | } else { |
| | | err_msg(0, "incorrect format msg"); |
| | | logger->error( "ShmModSocket::run_pubsub_proxy : incorrect format msg"); |
| | | } |
| | | free(buf); |
| | | } |
| | |
| | | #define TOPIC_LIDENTIFIER "{" |
| | | #define TOPIC_RIDENTIFIER "}" |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | #define BUS_MAP_KEY 1 |
| | | //typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString; |
| | | typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SHMKeySet; |