From 16935f4aebffdd1b6580b844391a0aa0f4f3012b Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 22 四月 2024 10:29:12 +0800
Subject: [PATCH] bug fixed
---
src/nng_wrap.cpp | 48 ++++++++++++++++++------
src/common.h | 30 +++++++-------
src/interface_bus_api.cpp | 18 +++++---
src/bn_api.cpp | 16 +++++--
4 files changed, 73 insertions(+), 39 deletions(-)
diff --git a/src/bn_api.cpp b/src/bn_api.cpp
index 50aec06..0b39f61 100644
--- a/src/bn_api.cpp
+++ b/src/bn_api.cpp
@@ -97,7 +97,7 @@
if (!topics || topics_len <= 0) return false;
const auto& url = get_url(URLRegTopic);
- if (url.empty()) {
+ if (!url) {
set_last_error("BHRegisterTopics url empty");
return false;
}
@@ -136,7 +136,7 @@
if (!topic || topic_len <= 0) return false;
const auto& url = get_url(URLQueryTopic);
- if (url.empty()) {
+ if (!url) {
set_last_error("BHQueryTopicAddress url empty");
return false;
}
@@ -165,7 +165,7 @@
// if (!query || query_len <= 0) return false;
const auto& url = get_url(URLQueryProcs);
- if (url.empty()) {
+ if (!url) {
set_last_error("BHQueryProcs url empty");
return false;
}
@@ -247,7 +247,7 @@
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
const auto& url = get_url(URLSubLocal);
- if (url.empty()) {
+ if (!url) {
set_last_error("BHSubscribeTopics url empty");
return false;
}
@@ -257,7 +257,7 @@
int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
const auto& url = get_url(URLSubNet);
- if (url.empty()) {
+ if (!url) {
set_last_error("BHSubscribeNetTopics url empty");
return false;
}
@@ -272,6 +272,9 @@
int *msgpub_len,
const int timeout_ms)
{
+ if (!proc_id && !proc_id_len && !msgpub && !msgpub_len)
+ return subscribe_read(NULL, NULL, timeout_ms) == 0;
+
string topic, msg;
auto ret = subscribe_read(&topic, &msg, timeout_ms);
// printf("BHReadSub msg topic %s length %lu\n", topic.c_str(), msg.length());
@@ -391,6 +394,9 @@
void **src,
const int timeout_ms)
{
+ if (!proc_id && !proc_id_len && !request && !request_len && !src)
+ return read_request(NULL, NULL, timeout_ms) == 0;
+
string msg;
auto ret = read_request(src, &msg, timeout_ms);
if (ret < 0) return false;
diff --git a/src/common.h b/src/common.h
index cea8c28..78f757d 100644
--- a/src/common.h
+++ b/src/common.h
@@ -46,18 +46,18 @@
URLHeartBeat,
};
-static char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //杩涚▼娉ㄥ唽
-static char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //娉ㄩ攢
-static char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //娉ㄥ唽涓婚
-static char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //鏌ヨ鎸囧畾鐨勪富棰�
-static char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //鏌ヨ鎵�鏈夋敞鍐岀殑杩涚▼
-static char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //璁㈤槄鏈湴涓婚
-static char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //璁㈤槄缃戠粶涓婚
-static char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc";
-static char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc"; //杩欎釜鏄唬鐞嗕腑蹇冿紝鐢ㄤ簬鎺ユ敹寰呭彂甯冪殑娑堟伅
-static char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc"; //杩欎釜鏄鎴风浠巆enter璁㈤槄鐨勯�氶亾
+static const char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //杩涚▼娉ㄥ唽
+static const char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //娉ㄩ攢
+static const char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //娉ㄥ唽涓婚
+static const char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //鏌ヨ鎸囧畾鐨勪富棰�
+static const char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //鏌ヨ鎵�鏈夋敞鍐岀殑杩涚▼
+static const char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //璁㈤槄鏈湴涓婚
+static const char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //璁㈤槄缃戠粶涓婚
+static const char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc";
+static const char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc"; //杩欎釜鏄唬鐞嗕腑蹇冿紝鐢ㄤ簬鎺ユ敹寰呭彂甯冪殑娑堟伅
+static const char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc"; //杩欎釜鏄鎴风浠巆enter璁㈤槄鐨勯�氶亾
-static const std::unordered_map<int, std::string> map_url{
+static const std::unordered_map<int, const char*> map_url{
{URLReg, IPC_REGISTER},
{URLDeReg, IPC_UNREGISTER},
{URLRegTopic, IPC_REGTOPIC},
@@ -69,12 +69,12 @@
{URLSubQueue, IPC_SUB_QUEUE},
{URLHeartBeat, IPC_HEARTBEAT},
};
-inline std::string get_url(const int type){
+inline const char* get_url(const int type){
auto iter = map_url.find(type);
if (iter != map_url.end()){
return iter->second;
}
- return {};
+ return NULL;
}
template <class... T> struct make_void{typedef void type;};
@@ -133,9 +133,9 @@
struct psmsg{
DISABLE_COPY_AND_ASSIGN(psmsg);
psmsg(const std::string& t, std::string&& m)
- :topic_(t),data_(std::move(m)){}
+ :topic_(t),data_(std::move(m)){}
psmsg(std::string&& t, std::string&& m)
- :topic_(std::move(t)),data_(std::move(m)){}
+ :topic_(std::move(t)),data_(std::move(m)){}
std::string topic_{};
std::string data_{};
};
diff --git a/src/interface_bus_api.cpp b/src/interface_bus_api.cpp
index 07f6db9..13bc08e 100644
--- a/src/interface_bus_api.cpp
+++ b/src/interface_bus_api.cpp
@@ -90,6 +90,11 @@
PRNTVITAG("topics is null");
return false;
}
+ const auto& url = get_url(URLRegTopic);
+ if (!url) {
+ set_last_error("bus_register_topics url empty");
+ return false;
+ }
if (get<8>(*b).empty()) {
PRNTVITAG("proc_id is null");
@@ -112,11 +117,6 @@
string msg(mtl2.SerializeAsString());
std::move(mtl2);
- const auto& url = get_url(URLRegTopic);
- if (url.empty()) {
- set_last_error("bus_register_topics url empty");
- return false;
- }
return simple_request(url, msg.data(), msg.size(), reply, reply_len, timeout_ms);
}
@@ -140,7 +140,7 @@
}
const auto& url = get_url(URLQueryTopic);
- if (url.empty()) {
+ if (!url) {
set_last_error("bus_query_topic_address url empty");
return false;
}
@@ -178,7 +178,7 @@
// }
const auto& url = get_url(URLQueryProcs);
- if (url.empty()) {
+ if (!url) {
set_last_error("bus_query_procs url empty");
return false;
}
@@ -307,6 +307,8 @@
PRNTVITAG("handle is null");
return false;
}
+ if (!proc_id && !proc_id_len && !msgpub && !msgpub_len)
+ return subscribe_read(NULL, NULL, timeout_ms, &get<2>(*b)) == 0;
string topic, msg;
auto ret = subscribe_read(&topic, &msg, timeout_ms, &get<2>(*b));
@@ -387,6 +389,8 @@
PRNTVITAG("handle is null");
return false;
}
+ if (!proc_id && !proc_id_len && !request && !request_len && !src)
+ return read_request(NULL, NULL, timeout_ms, &get<6>(*b)) == 0;
string msg;
auto ret = read_request(src, &msg, timeout_ms, &get<6>(*b));
diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp
index 1a2bc3e..3138e2f 100644
--- a/src/nng_wrap.cpp
+++ b/src/nng_wrap.cpp
@@ -314,17 +314,29 @@
_ps_sub* sub = (_ps_sub*)arg;
if (!sub) sub = singleton<_ps_sub>();
+ if (!topic && !msg) {
+ lock_guard<mutex> l{sub->mtx_msg_};
+ for (int i = 0; i < 2; i++) {
+ if (!sub->msg_.empty())
+ return 0;
+ this_thread::sleep_for(chrono::milliseconds(to_ms));
+ }
+ return -1;
+ }
+
TAG;
int tm = to_ms > 0 ? to_ms : 30;
unique_lock<mutex> l(sub->mtx_msg_);
- auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{
- return !sub->msg_.empty();
- });
- if (!status){
- PRNTVITAG("subscribe_read timeout");
- return -1;
+ if (sub->msg_.empty()) {
+ auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{
+ return !sub->msg_.empty();
+ });
+ if (!status){
+ PRNTVITAG("subscribe_read timeout");
+ return -1;
+ }
}
auto& tmp = sub->msg_.front();
*topic = std::move(tmp.topic_);
@@ -580,18 +592,30 @@
if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0)
return -1;
+ if (!src && !msg) {
+ lock_guard<mutex> l{rep->mtx_msg_};
+ for (int i = 0; i < 2; i++) {
+ if (!rep->msg_.empty())
+ return 0;
+ this_thread::sleep_for(chrono::milliseconds(to_ms));
+ }
+ return -1;
+ }
+
int tm = to_ms > 0 ? to_ms : 30;
uint64_t key{};
work* w{};
{
unique_lock<mutex> l(rep->mtx_msg_);
- auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{
- return !rep->msg_.empty();
- });
- if (!status){
- PRNTVITAG("read_request timeout");
- return -1;
+ if (rep->msg_.empty()) {
+ auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{
+ return !rep->msg_.empty();
+ });
+ if (!status){
+ PRNTVITAG("read_request timeout");
+ return -1;
+ }
}
auto iter = rep->msg_.begin();
key = iter->first;
--
Gitblit v1.8.0