| | |
| | | if (!topics || topics_len <= 0) return false; |
| | | |
| | | const auto& url = get_url(URLRegTopic); |
| | | if (url.empty()) { |
| | | if (!url) { |
| | | set_last_error("BHRegisterTopics url empty"); |
| | | return false; |
| | | } |
| | |
| | | if (!topic || topic_len <= 0) return false; |
| | | |
| | | const auto& url = get_url(URLQueryTopic); |
| | | if (url.empty()) { |
| | | if (!url) { |
| | | set_last_error("BHQueryTopicAddress url empty"); |
| | | return false; |
| | | } |
| | |
| | | // if (!query || query_len <= 0) return false; |
| | | |
| | | const auto& url = get_url(URLQueryProcs); |
| | | if (url.empty()) { |
| | | if (!url) { |
| | | set_last_error("BHQueryProcs url empty"); |
| | | return false; |
| | | } |
| | |
| | | int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | const auto& url = get_url(URLSubLocal); |
| | | if (url.empty()) { |
| | | if (!url) { |
| | | set_last_error("BHSubscribeTopics url empty"); |
| | | return false; |
| | | } |
| | |
| | | int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | const auto& url = get_url(URLSubNet); |
| | | if (url.empty()) { |
| | | if (!url) { |
| | | set_last_error("BHSubscribeNetTopics url empty"); |
| | | return false; |
| | | } |
| | |
| | | int *msgpub_len, |
| | | const int timeout_ms) |
| | | { |
| | | if (!proc_id && !proc_id_len && !msgpub && !msgpub_len) |
| | | return subscribe_read(NULL, NULL, timeout_ms) == 0; |
| | | |
| | | string topic, msg; |
| | | auto ret = subscribe_read(&topic, &msg, timeout_ms); |
| | | // printf("BHReadSub msg topic %s length %lu\n", topic.c_str(), msg.length()); |
| | |
| | | void **src, |
| | | const int timeout_ms) |
| | | { |
| | | if (!proc_id && !proc_id_len && !request && !request_len && !src) |
| | | return read_request(NULL, NULL, timeout_ms) == 0; |
| | | |
| | | string msg; |
| | | auto ret = read_request(src, &msg, timeout_ms); |
| | | if (ret < 0) return false; |
| | |
| | | URLHeartBeat, |
| | | }; |
| | | |
| | | static char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //进程注册 |
| | | static char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //注销 |
| | | static char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //注册主题 |
| | | static char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //查询指定的主题 |
| | | static char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //查询所有注册的进程 |
| | | static char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //订阅本地主题 |
| | | static char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //订阅网络主题 |
| | | static char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc"; |
| | | static char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc"; //这个是代理中心,用于接收待发布的消息 |
| | | static char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc"; //这个是客户端从center订阅的通道 |
| | | static const char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //进程注册 |
| | | static const char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //注销 |
| | | static const char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //注册主题 |
| | | static const char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //查询指定的主题 |
| | | static const char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //查询所有注册的进程 |
| | | static const char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //订阅本地主题 |
| | | static const char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //订阅网络主题 |
| | | static const char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc"; |
| | | static const char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc"; //这个是代理中心,用于接收待发布的消息 |
| | | static const char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc"; //这个是客户端从center订阅的通道 |
| | | |
| | | static const std::unordered_map<int, std::string> map_url{ |
| | | static const std::unordered_map<int, const char*> map_url{ |
| | | {URLReg, IPC_REGISTER}, |
| | | {URLDeReg, IPC_UNREGISTER}, |
| | | {URLRegTopic, IPC_REGTOPIC}, |
| | |
| | | {URLSubQueue, IPC_SUB_QUEUE}, |
| | | {URLHeartBeat, IPC_HEARTBEAT}, |
| | | }; |
| | | inline std::string get_url(const int type){ |
| | | inline const char* get_url(const int type){ |
| | | auto iter = map_url.find(type); |
| | | if (iter != map_url.end()){ |
| | | return iter->second; |
| | | } |
| | | return {}; |
| | | return NULL; |
| | | } |
| | | |
| | | template <class... T> struct make_void{typedef void type;}; |
| | |
| | | struct psmsg{ |
| | | DISABLE_COPY_AND_ASSIGN(psmsg); |
| | | psmsg(const std::string& t, std::string&& m) |
| | | :topic_(t),data_(std::move(m)){} |
| | | :topic_(t),data_(std::move(m)){} |
| | | psmsg(std::string&& t, std::string&& m) |
| | | :topic_(std::move(t)),data_(std::move(m)){} |
| | | :topic_(std::move(t)),data_(std::move(m)){} |
| | | std::string topic_{}; |
| | | std::string data_{}; |
| | | }; |
| | |
| | | PRNTVITAG("topics is null"); |
| | | return false; |
| | | } |
| | | const auto& url = get_url(URLRegTopic); |
| | | if (!url) { |
| | | set_last_error("bus_register_topics url empty"); |
| | | return false; |
| | | } |
| | | |
| | | if (get<8>(*b).empty()) { |
| | | PRNTVITAG("proc_id is null"); |
| | |
| | | string msg(mtl2.SerializeAsString()); |
| | | std::move(mtl2); |
| | | |
| | | const auto& url = get_url(URLRegTopic); |
| | | if (url.empty()) { |
| | | set_last_error("bus_register_topics url empty"); |
| | | return false; |
| | | } |
| | | return simple_request(url, msg.data(), msg.size(), reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | const auto& url = get_url(URLQueryTopic); |
| | | if (url.empty()) { |
| | | if (!url) { |
| | | set_last_error("bus_query_topic_address url empty"); |
| | | return false; |
| | | } |
| | |
| | | // } |
| | | |
| | | const auto& url = get_url(URLQueryProcs); |
| | | if (url.empty()) { |
| | | if (!url) { |
| | | set_last_error("bus_query_procs url empty"); |
| | | return false; |
| | | } |
| | |
| | | PRNTVITAG("handle is null"); |
| | | return false; |
| | | } |
| | | if (!proc_id && !proc_id_len && !msgpub && !msgpub_len) |
| | | return subscribe_read(NULL, NULL, timeout_ms, &get<2>(*b)) == 0; |
| | | |
| | | string topic, msg; |
| | | auto ret = subscribe_read(&topic, &msg, timeout_ms, &get<2>(*b)); |
| | |
| | | PRNTVITAG("handle is null"); |
| | | return false; |
| | | } |
| | | if (!proc_id && !proc_id_len && !request && !request_len && !src) |
| | | return read_request(NULL, NULL, timeout_ms, &get<6>(*b)) == 0; |
| | | |
| | | string msg; |
| | | auto ret = read_request(src, &msg, timeout_ms, &get<6>(*b)); |
| | |
| | | _ps_sub* sub = (_ps_sub*)arg; |
| | | if (!sub) sub = singleton<_ps_sub>(); |
| | | |
| | | if (!topic && !msg) { |
| | | lock_guard<mutex> l{sub->mtx_msg_}; |
| | | for (int i = 0; i < 2; i++) { |
| | | if (!sub->msg_.empty()) |
| | | return 0; |
| | | this_thread::sleep_for(chrono::milliseconds(to_ms)); |
| | | } |
| | | return -1; |
| | | } |
| | | |
| | | TAG; |
| | | |
| | | int tm = to_ms > 0 ? to_ms : 30; |
| | | |
| | | unique_lock<mutex> l(sub->mtx_msg_); |
| | | auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{ |
| | | return !sub->msg_.empty(); |
| | | }); |
| | | if (!status){ |
| | | PRNTVITAG("subscribe_read timeout"); |
| | | return -1; |
| | | if (sub->msg_.empty()) { |
| | | auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{ |
| | | return !sub->msg_.empty(); |
| | | }); |
| | | if (!status){ |
| | | PRNTVITAG("subscribe_read timeout"); |
| | | return -1; |
| | | } |
| | | } |
| | | auto& tmp = sub->msg_.front(); |
| | | *topic = std::move(tmp.topic_); |
| | |
| | | if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0) |
| | | return -1; |
| | | |
| | | if (!src && !msg) { |
| | | lock_guard<mutex> l{rep->mtx_msg_}; |
| | | for (int i = 0; i < 2; i++) { |
| | | if (!rep->msg_.empty()) |
| | | return 0; |
| | | this_thread::sleep_for(chrono::milliseconds(to_ms)); |
| | | } |
| | | return -1; |
| | | } |
| | | |
| | | int tm = to_ms > 0 ? to_ms : 30; |
| | | |
| | | uint64_t key{}; |
| | | work* w{}; |
| | | { |
| | | unique_lock<mutex> l(rep->mtx_msg_); |
| | | auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{ |
| | | return !rep->msg_.empty(); |
| | | }); |
| | | if (!status){ |
| | | PRNTVITAG("read_request timeout"); |
| | | return -1; |
| | | if (rep->msg_.empty()) { |
| | | auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{ |
| | | return !rep->msg_.empty(); |
| | | }); |
| | | if (!status){ |
| | | PRNTVITAG("read_request timeout"); |
| | | return -1; |
| | | } |
| | | } |
| | | auto iter = rep->msg_.begin(); |
| | | key = iter->first; |