From 0f53806de20f07b0bc15877c7a9b293758acbfa7 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 01 三月 2023 09:20:38 +0800
Subject: [PATCH] bug fixed ps_sub thread join
---
src/bn_api.cpp | 89 +++++++++++++++++++++++++++++++-------------
1 files changed, 63 insertions(+), 26 deletions(-)
diff --git a/src/bn_api.cpp b/src/bn_api.cpp
index 90e74ca..3ecf961 100644
--- a/src/bn_api.cpp
+++ b/src/bn_api.cpp
@@ -97,6 +97,10 @@
if (!topics || topics_len <= 0) return false;
const auto& url = get_url(URLRegTopic);
+ if (url.empty()) {
+ set_last_error("BHRegisterTopics url empty");
+ return false;
+ }
bhome_msg::MsgTopicList mtl;
if (!mtl.ParseFromArray(topics, topics_len)){
@@ -112,8 +116,10 @@
// printf("======>> BHRegisterTopics topic %s\n", mtl.topic_list(i).c_str());
mtl2.add_topic_list(mtl.topic_list(i));
}
+ std::move(mtl);
string msg(mtl2.SerializeAsString());
+ std::move(mtl2);
auto ret = simple_request(url, msg.data(), msg.size(), reply, reply_len, timeout_ms);
// printf("======>> BHRegisterTopics return value %d msg size %lu\n", ret, msg.size());
@@ -129,8 +135,11 @@
{
if (!topic || topic_len <= 0) return false;
- auto url(get_url(URLQueryTopic));
-
+ const auto& url = get_url(URLQueryTopic);
+ if (url.empty()) {
+ set_last_error("BHQueryTopicAddress url empty");
+ return false;
+ }
// if (remote && remote_len > 0){
// BHAddress addr;
// if (addr.ParseFromArray(remote, remote_len)){
@@ -155,7 +164,11 @@
{
// if (!query || query_len <= 0) return false;
- auto url(get_url(URLQueryProcs));
+ const auto& url = get_url(URLQueryProcs);
+ if (url.empty()) {
+ set_last_error("BHQueryProcs url empty");
+ return false;
+ }
// if (remote && remote_len > 0){
// BHAddress addr;
@@ -193,14 +206,18 @@
newPub.set_data(string{(const char*)msgpub, (const size_t)msgpub_len});
string msg(newPub.SerializeAsString());
+ std::move(newPub);
+
+ if (pub.topic().empty()) {
+ set_last_error("BHPublish topic empty");
+ return false;
+ }
auto ret = publish(pub.topic(), msg.data(), msg.size());
if (ret == 0) return false;
return true;
}
-// 璁㈤槄
-int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
-{
+static int sub(const string& url, const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms){
if (!topics || topics_len <= 0) return false;
bhome_msg::MsgTopicList mtl;
@@ -218,17 +235,34 @@
// printf("BHSubscribeTopics %s\n", t.c_str());
subscribe_topic(t);
}
- return true;
- auto ret = simple_request(get_url(URLSubLocal), topics, topics_len, reply, reply_len, timeout_ms);
+ std::move(mtl);
+
+ auto ret = simple_request(url, topics, topics_len, reply, reply_len, timeout_ms);
if (!ret){
printf("BHSubscribeTopics simple_request failed\n");
}
return ret;
}
+// 璁㈤槄
+int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+ const auto& url = get_url(URLSubLocal);
+ if (url.empty()) {
+ set_last_error("BHSubscribeTopics url empty");
+ return false;
+ }
+ return sub(url, topics, topics_len, reply, reply_len, timeout_ms);
+}
// 璁㈤槄缃戠粶,涓嶅疄鐜�
int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
- return BHSubscribeTopics(topics, topics_len, reply, reply_len, timeout_ms);
+ const auto& url = get_url(URLSubNet);
+ if (url.empty()) {
+ set_last_error("BHSubscribeNetTopics url empty");
+ return false;
+ }
+
+ return sub(url, topics, topics_len, reply, reply_len, timeout_ms);
}
// 璇诲彇璁㈤槄娑堟伅,proc_id鏆傛椂娌$敤锛岃繑鍥瀎ake msg
@@ -278,8 +312,10 @@
bhome_msg::MsgQueryTopic msg;
msg.set_topic(req.topic());
+ std::move(req);
string s(msg.SerializeAsString());
+ std::move(msg);
void* reply2;
int reply_len2;
@@ -287,7 +323,6 @@
int ret = BHQueryTopicAddress(remote.c_str(), remote.length(), s.data(), s.size(),
&reply2, &reply_len2, timeout_ms);
if (!ret) return false;
-
bhome_msg::MsgQueryTopicReply mr;
if (!mr.ParseFromArray(reply2, reply_len2)){
@@ -300,9 +335,8 @@
}
auto& node_addr = mr.node_address(0);
- auto& procid = node_addr.proc_id();
+ *proc_id = node_addr.proc_id();
- *proc_id = procid;
return true;
}
@@ -319,26 +353,28 @@
{
if (!request || request_len <= 0) return false;
+ string url{};
// BHQueryTopicAddress鑾峰彇proc_id
string procid{};
- if (!get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
- return false;
- }
-
- auto url("ipc:///tmp/" + procid);
if (remote && remote_len > 0){
BHAddress addr;
if (addr.ParseFromArray(remote, remote_len)){
if (!addr.ip().empty() && addr.port() > 0){
- // url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
- printf("======>> BHRequest use remote address %s\n", url.c_str());
+ url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
+ // printf("======>> BHRequest use remote address %s\n", url.c_str());
}
}
+ } else if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
+ // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
+ // return false;
+ url = "ipc:///tmp/" + procid;
}
- // 浣跨敤procid浣滀负ipc閫氫俊
- // printf("BHRequest procid %s\n", procid.c_str());
+ if (url.empty()) {
+ set_last_error("BHRequest url empty");
+ return false;
+ }
int rc = request2(url, request, request_len, reply, reply_len, timeout_ms);
if (rc < 0) return false;
@@ -357,14 +393,15 @@
{
string msg;
auto ret = read_request(src, &msg, timeout_ms);
- if (ret != 0) return false;
+ if (ret < 0) return false;
- string procid{};
- if (!get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
- return false;
+ if (ret == REPLY_IPC){
+ string procid{};
+ if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
+ copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
+ }
}
- copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
copy_memory(request, request_len, msg.data(), msg.size());
return true;
--
Gitblit v1.8.0