From 16935f4aebffdd1b6580b844391a0aa0f4f3012b Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 22 四月 2024 10:29:12 +0800 Subject: [PATCH] bug fixed --- src/nng_wrap.cpp | 48 ++++++++++++++++++------ src/common.h | 30 +++++++------- src/interface_bus_api.cpp | 18 +++++--- src/bn_api.cpp | 16 +++++-- 4 files changed, 73 insertions(+), 39 deletions(-) diff --git a/src/bn_api.cpp b/src/bn_api.cpp index 50aec06..0b39f61 100644 --- a/src/bn_api.cpp +++ b/src/bn_api.cpp @@ -97,7 +97,7 @@ if (!topics || topics_len <= 0) return false; const auto& url = get_url(URLRegTopic); - if (url.empty()) { + if (!url) { set_last_error("BHRegisterTopics url empty"); return false; } @@ -136,7 +136,7 @@ if (!topic || topic_len <= 0) return false; const auto& url = get_url(URLQueryTopic); - if (url.empty()) { + if (!url) { set_last_error("BHQueryTopicAddress url empty"); return false; } @@ -165,7 +165,7 @@ // if (!query || query_len <= 0) return false; const auto& url = get_url(URLQueryProcs); - if (url.empty()) { + if (!url) { set_last_error("BHQueryProcs url empty"); return false; } @@ -247,7 +247,7 @@ int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { const auto& url = get_url(URLSubLocal); - if (url.empty()) { + if (!url) { set_last_error("BHSubscribeTopics url empty"); return false; } @@ -257,7 +257,7 @@ int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { const auto& url = get_url(URLSubNet); - if (url.empty()) { + if (!url) { set_last_error("BHSubscribeNetTopics url empty"); return false; } @@ -272,6 +272,9 @@ int *msgpub_len, const int timeout_ms) { + if (!proc_id && !proc_id_len && !msgpub && !msgpub_len) + return subscribe_read(NULL, NULL, timeout_ms) == 0; + string topic, msg; auto ret = subscribe_read(&topic, &msg, timeout_ms); // printf("BHReadSub msg topic %s length %lu\n", topic.c_str(), msg.length()); @@ -391,6 +394,9 @@ void **src, const int timeout_ms) { + if (!proc_id && !proc_id_len && !request && !request_len && !src) + return read_request(NULL, NULL, timeout_ms) == 0; + string msg; auto ret = read_request(src, &msg, timeout_ms); if (ret < 0) return false; diff --git a/src/common.h b/src/common.h index cea8c28..78f757d 100644 --- a/src/common.h +++ b/src/common.h @@ -46,18 +46,18 @@ URLHeartBeat, }; -static char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //杩涚▼娉ㄥ唽 -static char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //娉ㄩ攢 -static char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //娉ㄥ唽涓婚 -static char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //鏌ヨ鎸囧畾鐨勪富棰� -static char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //鏌ヨ鎵�鏈夋敞鍐岀殑杩涚▼ -static char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //璁㈤槄鏈湴涓婚 -static char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //璁㈤槄缃戠粶涓婚 -static char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc"; -static char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc"; //杩欎釜鏄唬鐞嗕腑蹇冿紝鐢ㄤ簬鎺ユ敹寰呭彂甯冪殑娑堟伅 -static char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc"; //杩欎釜鏄鎴风浠巆enter璁㈤槄鐨勯�氶亾 +static const char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //杩涚▼娉ㄥ唽 +static const char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //娉ㄩ攢 +static const char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //娉ㄥ唽涓婚 +static const char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //鏌ヨ鎸囧畾鐨勪富棰� +static const char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //鏌ヨ鎵�鏈夋敞鍐岀殑杩涚▼ +static const char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //璁㈤槄鏈湴涓婚 +static const char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //璁㈤槄缃戠粶涓婚 +static const char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc"; +static const char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc"; //杩欎釜鏄唬鐞嗕腑蹇冿紝鐢ㄤ簬鎺ユ敹寰呭彂甯冪殑娑堟伅 +static const char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc"; //杩欎釜鏄鎴风浠巆enter璁㈤槄鐨勯�氶亾 -static const std::unordered_map<int, std::string> map_url{ +static const std::unordered_map<int, const char*> map_url{ {URLReg, IPC_REGISTER}, {URLDeReg, IPC_UNREGISTER}, {URLRegTopic, IPC_REGTOPIC}, @@ -69,12 +69,12 @@ {URLSubQueue, IPC_SUB_QUEUE}, {URLHeartBeat, IPC_HEARTBEAT}, }; -inline std::string get_url(const int type){ +inline const char* get_url(const int type){ auto iter = map_url.find(type); if (iter != map_url.end()){ return iter->second; } - return {}; + return NULL; } template <class... T> struct make_void{typedef void type;}; @@ -133,9 +133,9 @@ struct psmsg{ DISABLE_COPY_AND_ASSIGN(psmsg); psmsg(const std::string& t, std::string&& m) - :topic_(t),data_(std::move(m)){} + :topic_(t),data_(std::move(m)){} psmsg(std::string&& t, std::string&& m) - :topic_(std::move(t)),data_(std::move(m)){} + :topic_(std::move(t)),data_(std::move(m)){} std::string topic_{}; std::string data_{}; }; diff --git a/src/interface_bus_api.cpp b/src/interface_bus_api.cpp index 07f6db9..13bc08e 100644 --- a/src/interface_bus_api.cpp +++ b/src/interface_bus_api.cpp @@ -90,6 +90,11 @@ PRNTVITAG("topics is null"); return false; } + const auto& url = get_url(URLRegTopic); + if (!url) { + set_last_error("bus_register_topics url empty"); + return false; + } if (get<8>(*b).empty()) { PRNTVITAG("proc_id is null"); @@ -112,11 +117,6 @@ string msg(mtl2.SerializeAsString()); std::move(mtl2); - const auto& url = get_url(URLRegTopic); - if (url.empty()) { - set_last_error("bus_register_topics url empty"); - return false; - } return simple_request(url, msg.data(), msg.size(), reply, reply_len, timeout_ms); } @@ -140,7 +140,7 @@ } const auto& url = get_url(URLQueryTopic); - if (url.empty()) { + if (!url) { set_last_error("bus_query_topic_address url empty"); return false; } @@ -178,7 +178,7 @@ // } const auto& url = get_url(URLQueryProcs); - if (url.empty()) { + if (!url) { set_last_error("bus_query_procs url empty"); return false; } @@ -307,6 +307,8 @@ PRNTVITAG("handle is null"); return false; } + if (!proc_id && !proc_id_len && !msgpub && !msgpub_len) + return subscribe_read(NULL, NULL, timeout_ms, &get<2>(*b)) == 0; string topic, msg; auto ret = subscribe_read(&topic, &msg, timeout_ms, &get<2>(*b)); @@ -387,6 +389,8 @@ PRNTVITAG("handle is null"); return false; } + if (!proc_id && !proc_id_len && !request && !request_len && !src) + return read_request(NULL, NULL, timeout_ms, &get<6>(*b)) == 0; string msg; auto ret = read_request(src, &msg, timeout_ms, &get<6>(*b)); diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp index 1a2bc3e..3138e2f 100644 --- a/src/nng_wrap.cpp +++ b/src/nng_wrap.cpp @@ -314,17 +314,29 @@ _ps_sub* sub = (_ps_sub*)arg; if (!sub) sub = singleton<_ps_sub>(); + if (!topic && !msg) { + lock_guard<mutex> l{sub->mtx_msg_}; + for (int i = 0; i < 2; i++) { + if (!sub->msg_.empty()) + return 0; + this_thread::sleep_for(chrono::milliseconds(to_ms)); + } + return -1; + } + TAG; int tm = to_ms > 0 ? to_ms : 30; unique_lock<mutex> l(sub->mtx_msg_); - auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{ - return !sub->msg_.empty(); - }); - if (!status){ - PRNTVITAG("subscribe_read timeout"); - return -1; + if (sub->msg_.empty()) { + auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{ + return !sub->msg_.empty(); + }); + if (!status){ + PRNTVITAG("subscribe_read timeout"); + return -1; + } } auto& tmp = sub->msg_.front(); *topic = std::move(tmp.topic_); @@ -580,18 +592,30 @@ if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0) return -1; + if (!src && !msg) { + lock_guard<mutex> l{rep->mtx_msg_}; + for (int i = 0; i < 2; i++) { + if (!rep->msg_.empty()) + return 0; + this_thread::sleep_for(chrono::milliseconds(to_ms)); + } + return -1; + } + int tm = to_ms > 0 ? to_ms : 30; uint64_t key{}; work* w{}; { unique_lock<mutex> l(rep->mtx_msg_); - auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{ - return !rep->msg_.empty(); - }); - if (!status){ - PRNTVITAG("read_request timeout"); - return -1; + if (rep->msg_.empty()) { + auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{ + return !rep->msg_.empty(); + }); + if (!status){ + PRNTVITAG("read_request timeout"); + return -1; + } } auto iter = rep->msg_.begin(); key = iter->first; -- Gitblit v1.8.0