From 0f53806de20f07b0bc15877c7a9b293758acbfa7 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 01 三月 2023 09:20:38 +0800
Subject: [PATCH] bug fixed ps_sub thread join
---
src/bn_api.cpp | 74 ++++++++++++++++++++++++++++---------
1 files changed, 56 insertions(+), 18 deletions(-)
diff --git a/src/bn_api.cpp b/src/bn_api.cpp
index 2ea2429..3ecf961 100644
--- a/src/bn_api.cpp
+++ b/src/bn_api.cpp
@@ -42,7 +42,7 @@
unsigned short port = *(unsigned short*)(out);
copy_memory(reply, reply_len, (char*)out + port_size, out_len - port_size);
BHFree(out, out_len);
- // printf("======>> recv port %d\n", port);
+ printf("======>> recv port %d\n", port);
/////////////////////////////////////////////////////////////////////////
const auto& url_pub_proxy = get_url(URLPubProxy);
@@ -97,6 +97,10 @@
if (!topics || topics_len <= 0) return false;
const auto& url = get_url(URLRegTopic);
+ if (url.empty()) {
+ set_last_error("BHRegisterTopics url empty");
+ return false;
+ }
bhome_msg::MsgTopicList mtl;
if (!mtl.ParseFromArray(topics, topics_len)){
@@ -112,8 +116,10 @@
// printf("======>> BHRegisterTopics topic %s\n", mtl.topic_list(i).c_str());
mtl2.add_topic_list(mtl.topic_list(i));
}
+ std::move(mtl);
string msg(mtl2.SerializeAsString());
+ std::move(mtl2);
auto ret = simple_request(url, msg.data(), msg.size(), reply, reply_len, timeout_ms);
// printf("======>> BHRegisterTopics return value %d msg size %lu\n", ret, msg.size());
@@ -129,8 +135,11 @@
{
if (!topic || topic_len <= 0) return false;
- auto url(get_url(URLQueryTopic));
-
+ const auto& url = get_url(URLQueryTopic);
+ if (url.empty()) {
+ set_last_error("BHQueryTopicAddress url empty");
+ return false;
+ }
// if (remote && remote_len > 0){
// BHAddress addr;
// if (addr.ParseFromArray(remote, remote_len)){
@@ -155,7 +164,11 @@
{
// if (!query || query_len <= 0) return false;
- auto url(get_url(URLQueryProcs));
+ const auto& url = get_url(URLQueryProcs);
+ if (url.empty()) {
+ set_last_error("BHQueryProcs url empty");
+ return false;
+ }
// if (remote && remote_len > 0){
// BHAddress addr;
@@ -193,6 +206,12 @@
newPub.set_data(string{(const char*)msgpub, (const size_t)msgpub_len});
string msg(newPub.SerializeAsString());
+ std::move(newPub);
+
+ if (pub.topic().empty()) {
+ set_last_error("BHPublish topic empty");
+ return false;
+ }
auto ret = publish(pub.topic(), msg.data(), msg.size());
if (ret == 0) return false;
return true;
@@ -216,6 +235,7 @@
// printf("BHSubscribeTopics %s\n", t.c_str());
subscribe_topic(t);
}
+ std::move(mtl);
auto ret = simple_request(url, topics, topics_len, reply, reply_len, timeout_ms);
if (!ret){
@@ -226,12 +246,23 @@
// 璁㈤槄
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
- return sub(get_url(URLSubLocal), topics, topics_len, reply, reply_len, timeout_ms);
+ const auto& url = get_url(URLSubLocal);
+ if (url.empty()) {
+ set_last_error("BHSubscribeTopics url empty");
+ return false;
+ }
+ return sub(url, topics, topics_len, reply, reply_len, timeout_ms);
}
// 璁㈤槄缃戠粶,涓嶅疄鐜�
int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
- return sub(get_url(URLSubNet), topics, topics_len, reply, reply_len, timeout_ms);
+ const auto& url = get_url(URLSubNet);
+ if (url.empty()) {
+ set_last_error("BHSubscribeNetTopics url empty");
+ return false;
+ }
+
+ return sub(url, topics, topics_len, reply, reply_len, timeout_ms);
}
// 璇诲彇璁㈤槄娑堟伅,proc_id鏆傛椂娌$敤锛岃繑鍥瀎ake msg
@@ -281,8 +312,10 @@
bhome_msg::MsgQueryTopic msg;
msg.set_topic(req.topic());
+ std::move(req);
string s(msg.SerializeAsString());
+ std::move(msg);
void* reply2;
int reply_len2;
@@ -290,7 +323,6 @@
int ret = BHQueryTopicAddress(remote.c_str(), remote.length(), s.data(), s.size(),
&reply2, &reply_len2, timeout_ms);
if (!ret) return false;
-
bhome_msg::MsgQueryTopicReply mr;
if (!mr.ParseFromArray(reply2, reply_len2)){
@@ -303,9 +335,8 @@
}
auto& node_addr = mr.node_address(0);
- auto& procid = node_addr.proc_id();
+ *proc_id = node_addr.proc_id();
- *proc_id = procid;
return true;
}
@@ -325,11 +356,7 @@
string url{};
// BHQueryTopicAddress鑾峰彇proc_id
string procid{};
- if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
- // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
- // return false;
- url = "ipc:///tmp/" + procid;
- }
+
if (remote && remote_len > 0){
BHAddress addr;
if (addr.ParseFromArray(remote, remote_len)){
@@ -338,6 +365,15 @@
// printf("======>> BHRequest use remote address %s\n", url.c_str());
}
}
+ } else if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
+ // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
+ // return false;
+ url = "ipc:///tmp/" + procid;
+ }
+
+ if (url.empty()) {
+ set_last_error("BHRequest url empty");
+ return false;
}
int rc = request2(url, request, request_len, reply, reply_len, timeout_ms);
@@ -357,11 +393,13 @@
{
string msg;
auto ret = read_request(src, &msg, timeout_ms);
- if (ret != 0) return false;
+ if (ret < 0) return false;
- string procid{};
- if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
- copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
+ if (ret == REPLY_IPC){
+ string procid{};
+ if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
+ copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
+ }
}
copy_memory(request, request_len, msg.data(), msg.size());
--
Gitblit v1.8.0