From 16935f4aebffdd1b6580b844391a0aa0f4f3012b Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 22 四月 2024 10:29:12 +0800
Subject: [PATCH] bug fixed

---
 src/nng_wrap.cpp          |   48 ++++++++++++++++++------
 src/common.h              |   30 +++++++-------
 src/interface_bus_api.cpp |   18 +++++---
 src/bn_api.cpp            |   16 +++++--
 4 files changed, 73 insertions(+), 39 deletions(-)

diff --git a/src/bn_api.cpp b/src/bn_api.cpp
index 50aec06..0b39f61 100644
--- a/src/bn_api.cpp
+++ b/src/bn_api.cpp
@@ -97,7 +97,7 @@
     if (!topics || topics_len <= 0) return false;
 
     const auto& url = get_url(URLRegTopic);
-    if (url.empty()) {
+    if (!url) {
         set_last_error("BHRegisterTopics url empty");
         return false;
     }
@@ -136,7 +136,7 @@
     if (!topic || topic_len <= 0) return false;
 
     const auto& url = get_url(URLQueryTopic);
-    if (url.empty()) {
+    if (!url) {
         set_last_error("BHQueryTopicAddress url empty");
         return false;
     }
@@ -165,7 +165,7 @@
     // if (!query || query_len <= 0) return false;
 
     const auto& url = get_url(URLQueryProcs);
-    if (url.empty()) {
+    if (!url) {
         set_last_error("BHQueryProcs url empty");
         return false;
     }
@@ -247,7 +247,7 @@
 int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
 {
     const auto& url = get_url(URLSubLocal);
-    if (url.empty()) {
+    if (!url) {
         set_last_error("BHSubscribeTopics url empty");
         return false;
     }
@@ -257,7 +257,7 @@
 int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
 {
     const auto& url = get_url(URLSubNet);
-    if (url.empty()) {
+    if (!url) {
         set_last_error("BHSubscribeNetTopics url empty");
         return false;
     }
@@ -272,6 +272,9 @@
               int *msgpub_len,
               const int timeout_ms)
 {
+    if (!proc_id && !proc_id_len && !msgpub && !msgpub_len)
+        return subscribe_read(NULL, NULL, timeout_ms) == 0;
+
     string topic, msg;
     auto ret = subscribe_read(&topic, &msg, timeout_ms);
     // printf("BHReadSub msg topic %s length %lu\n", topic.c_str(), msg.length());
@@ -391,6 +394,9 @@
                   void **src,
                   const int timeout_ms)
 {
+    if (!proc_id && !proc_id_len && !request && !request_len && !src)
+        return read_request(NULL, NULL, timeout_ms) == 0;
+
     string msg;
     auto ret = read_request(src, &msg, timeout_ms);
     if (ret < 0) return false;
diff --git a/src/common.h b/src/common.h
index cea8c28..78f757d 100644
--- a/src/common.h
+++ b/src/common.h
@@ -46,18 +46,18 @@
     URLHeartBeat,
 };
 
-static char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //杩涚▼娉ㄥ唽
-static char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //娉ㄩ攢
-static char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //娉ㄥ唽涓婚
-static char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //鏌ヨ鎸囧畾鐨勪富棰�
-static char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //鏌ヨ鎵�鏈夋敞鍐岀殑杩涚▼
-static char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //璁㈤槄鏈湴涓婚
-static char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //璁㈤槄缃戠粶涓婚
-static char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc";
-static char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc";   //杩欎釜鏄唬鐞嗕腑蹇冿紝鐢ㄤ簬鎺ユ敹寰呭彂甯冪殑娑堟伅
-static char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc";  //杩欎釜鏄鎴风浠巆enter璁㈤槄鐨勯�氶亾
+static const char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //杩涚▼娉ㄥ唽
+static const char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //娉ㄩ攢
+static const char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //娉ㄥ唽涓婚
+static const char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //鏌ヨ鎸囧畾鐨勪富棰�
+static const char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //鏌ヨ鎵�鏈夋敞鍐岀殑杩涚▼
+static const char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //璁㈤槄鏈湴涓婚
+static const char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //璁㈤槄缃戠粶涓婚
+static const char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc";
+static const char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc";   //杩欎釜鏄唬鐞嗕腑蹇冿紝鐢ㄤ簬鎺ユ敹寰呭彂甯冪殑娑堟伅
+static const char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc";  //杩欎釜鏄鎴风浠巆enter璁㈤槄鐨勯�氶亾
 
-static const std::unordered_map<int, std::string> map_url{
+static const std::unordered_map<int, const char*> map_url{
     {URLReg,                IPC_REGISTER},
     {URLDeReg,              IPC_UNREGISTER},
     {URLRegTopic,           IPC_REGTOPIC},
@@ -69,12 +69,12 @@
     {URLSubQueue,           IPC_SUB_QUEUE},
     {URLHeartBeat,          IPC_HEARTBEAT},
 };
-inline std::string get_url(const int type){
+inline const char* get_url(const int type){
     auto iter = map_url.find(type);
     if (iter != map_url.end()){
         return iter->second;
     }
-    return {};
+    return NULL;
 }
 
 template <class... T> struct make_void{typedef void type;};
@@ -133,9 +133,9 @@
     struct psmsg{
         DISABLE_COPY_AND_ASSIGN(psmsg);
         psmsg(const std::string& t, std::string&& m)
-        :topic_(t),data_(std::move(m)){}
+            :topic_(t),data_(std::move(m)){}
         psmsg(std::string&& t, std::string&& m)
-        :topic_(std::move(t)),data_(std::move(m)){}
+            :topic_(std::move(t)),data_(std::move(m)){}
         std::string topic_{};
         std::string data_{};
     };
diff --git a/src/interface_bus_api.cpp b/src/interface_bus_api.cpp
index 07f6db9..13bc08e 100644
--- a/src/interface_bus_api.cpp
+++ b/src/interface_bus_api.cpp
@@ -90,6 +90,11 @@
         PRNTVITAG("topics is null");
         return false;
     }
+    const auto& url = get_url(URLRegTopic);
+    if (!url) {
+        set_last_error("bus_register_topics url empty");
+        return false;
+    }
 
     if (get<8>(*b).empty()) {
         PRNTVITAG("proc_id is null");
@@ -112,11 +117,6 @@
     string msg(mtl2.SerializeAsString());
     std::move(mtl2);
 
-    const auto& url = get_url(URLRegTopic);
-    if (url.empty()) {
-        set_last_error("bus_register_topics url empty");
-        return false;
-    }
     return simple_request(url, msg.data(), msg.size(), reply, reply_len, timeout_ms);
 }
 
@@ -140,7 +140,7 @@
     }
 
     const auto& url = get_url(URLQueryTopic);
-    if (url.empty()) {
+    if (!url) {
         set_last_error("bus_query_topic_address url empty");
         return false;
     }
@@ -178,7 +178,7 @@
     // }
 
     const auto& url = get_url(URLQueryProcs);
-    if (url.empty()) {
+    if (!url) {
         set_last_error("bus_query_procs url empty");
         return false;
     }
@@ -307,6 +307,8 @@
         PRNTVITAG("handle is null");
         return false;
     }
+    if (!proc_id && !proc_id_len && !msgpub && !msgpub_len)
+        return subscribe_read(NULL, NULL, timeout_ms, &get<2>(*b)) == 0;
 
     string topic, msg;
     auto ret = subscribe_read(&topic, &msg, timeout_ms, &get<2>(*b));
@@ -387,6 +389,8 @@
         PRNTVITAG("handle is null");
         return false;
     }
+    if (!proc_id && !proc_id_len && !request && !request_len && !src)
+        return read_request(NULL, NULL, timeout_ms, &get<6>(*b)) == 0;
 
     string msg;
     auto ret = read_request(src, &msg, timeout_ms, &get<6>(*b));
diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp
index 1a2bc3e..3138e2f 100644
--- a/src/nng_wrap.cpp
+++ b/src/nng_wrap.cpp
@@ -314,17 +314,29 @@
     _ps_sub* sub = (_ps_sub*)arg;
     if (!sub) sub = singleton<_ps_sub>();
 
+    if (!topic && !msg) {
+        lock_guard<mutex> l{sub->mtx_msg_};
+        for (int i = 0; i < 2; i++) {
+            if (!sub->msg_.empty())
+                return 0;
+            this_thread::sleep_for(chrono::milliseconds(to_ms));
+        }
+        return -1;
+    }
+
     TAG;
 
     int tm = to_ms > 0 ? to_ms : 30;
 
     unique_lock<mutex> l(sub->mtx_msg_);
-    auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{
-        return !sub->msg_.empty();
-    });
-    if (!status){
-        PRNTVITAG("subscribe_read timeout");
-        return -1;
+    if (sub->msg_.empty()) {
+        auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{
+            return !sub->msg_.empty();
+        });
+        if (!status){
+            PRNTVITAG("subscribe_read timeout");
+            return -1;
+        }
     }
     auto& tmp = sub->msg_.front();
     *topic = std::move(tmp.topic_);
@@ -580,18 +592,30 @@
         if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0)
             return -1;
 
+    if (!src && !msg) {
+        lock_guard<mutex> l{rep->mtx_msg_};
+        for (int i = 0; i < 2; i++) {
+            if (!rep->msg_.empty())
+                return 0;
+            this_thread::sleep_for(chrono::milliseconds(to_ms));
+        }
+        return -1;
+    }
+
     int tm = to_ms > 0 ? to_ms : 30;
 
     uint64_t key{};
     work* w{};
     {
         unique_lock<mutex> l(rep->mtx_msg_);
-        auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{
-            return !rep->msg_.empty();
-        });
-        if (!status){
-            PRNTVITAG("read_request timeout");
-            return -1;
+        if (rep->msg_.empty()) {
+            auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{
+                return !rep->msg_.empty();
+            });
+            if (!status){
+                PRNTVITAG("read_request timeout");
+                return -1;
+            }
         }
         auto iter = rep->msg_.begin();
         key = iter->first;

--
Gitblit v1.8.0