From 777333ff834744ac5665fa9abe5ec6373d25cda8 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期三, 01 三月 2023 09:22:30 +0800 Subject: [PATCH] bug fixed ps_sub thread join --- cbhomeclient.cpp | 100 +++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 79 insertions(+), 21 deletions(-) diff --git a/cbhomeclient.cpp b/cbhomeclient.cpp index 1dac84e..0963c4b 100644 --- a/cbhomeclient.cpp +++ b/cbhomeclient.cpp @@ -9,7 +9,8 @@ #include "cbhomeclient.h" #include "fixed_q.h" -#include "3dparty/bus_nng/interface_bus_api.h" +#include "3rdparty/bus_nng/interface_bus_api.h" +#include "3rdparty/bus_nng/bn_api.h" #include "bhome_msg.pb.h" @@ -74,7 +75,7 @@ } template <class T> Msg msg(T&& t){ Msg m; - msg_helper(make_index_sequence<tuple_size<T>::value>{}, std::forward<T>(t), m); + msg_helper(make_index_sequence<tuple_size<typename decay<T>::type>::value>{}, std::forward<T>(t), m); return m; } @@ -82,7 +83,7 @@ Msg to_bus(client* cli, F&& f, Args&&... args){ Msg mesg; if (std::forward<F>(f)(cli->bus, std::forward<Args>(args)...)) - mesg = std::move(msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...)))); + mesg = msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...))); return mesg; } @@ -96,7 +97,7 @@ tie(d, s) = tp; if (m.ParseFromArray(d, s)) { bus_free(d, s); - msg = std::move(make_tuple(true, std::move(m))); + msg = make_tuple(true, std::move(m)); break; } m.Clear(); @@ -110,8 +111,8 @@ template <size_t... Is, class F, class... Args> MsgCR to_center(client* cli, F&& f, Args&&... args){ MsgCR msg(dummy()); - auto vmsg = std::move(to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...)); - if (!vmsg.empty()) msg = std::move(parse(cli, vmsg.at(0))); + auto vmsg = to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...); + if (!vmsg.empty()) msg = parse(cli, vmsg.at(0)); return msg; } @@ -124,8 +125,8 @@ const auto& tpc = tlist.SerializeAsString(); void* replymsg = NULL; int replysize = 0; - msg = std::move(to_center<2,3>(cli, std::forward<F>(f), - tpc.data(), tpc.size(), &replymsg, &replysize, sndto)); + msg = to_center<2,3>(cli, std::forward<F>(f), + tpc.data(), tpc.size(), &replymsg, &replysize, sndto); return msg; } @@ -248,11 +249,7 @@ delete cli; } -struct csubmsg* bus_client_get_submsg(void* handle){ - client* cli = ptr(handle); - Msg msg = std::move(cli->sub_q->pop()); - if (msg.empty()) return NULL; - +static struct csubmsg* parse_submsg(const Msg& msg){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; @@ -265,12 +262,23 @@ return pmsg; } - -struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){ +struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms){ client* cli = ptr(handle); - Msg msg = std::move(cli->readreq_q->pop()); + Msg msg = cli->sub_q->pop(ms); if (msg.empty()) return NULL; + return parse_submsg(msg); +} + +struct csubmsg* bus_client_get_submsg(void* handle){ + client* cli = ptr(handle); + Msg msg = cli->sub_q->pop(); + if (msg.empty()) return NULL; + + return parse_submsg(msg); +} + +static struct creqmsg* parse_reqmsg(const Msg& msg, void** src){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; @@ -284,14 +292,28 @@ return pmsg; } +struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms){ + client* cli = ptr(handle); + Msg msg = cli->readreq_q->pop(ms); + if (msg.empty()) return NULL; + + return parse_reqmsg(msg, src); +} +struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){ + client* cli = ptr(handle); + Msg msg = cli->readreq_q->pop(); + if (msg.empty()) return NULL; + + return parse_reqmsg(msg, src); +} int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){ void* procid = NULL, *reply = NULL; int pids = 0, replys = 0; - auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0, + auto vmsg = to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0, msg->msg, msg->msgl, &procid, &pids, - &reply, &replys, sndto)); + &reply, &replys, sndto); if (!vmsg.empty()){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; @@ -311,7 +333,7 @@ return false; } -int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg){ +int bus_client_reply(void* handle, void* src, const struct crepmsg* msg){ MsgRequestTopicReply msgR; auto err = msgR.mutable_errmsg(); @@ -325,17 +347,53 @@ return bus_send_reply(cli->bus, src, pbstr.data(), pbstr.size()); } +struct cqueryprocs* bus_client_query_procs(void* handle, size_t* count){ + BHAddress addr; + const auto& pbaddr = addr.SerializeAsString(); + MsgQueryProc topic; + const auto& pbtopic = topic.SerializeAsString(); + + void* rep = NULL; + int repl = 0; + auto msg = to_bus<4,5>(ptr(handle), bus_query_procs, pbaddr.data(), pbaddr.size(), + pbtopic.data(), pbtopic.size(), &rep, &repl, sndto); + + if (msg.empty()) return NULL; + + // bug 宸蹭慨澶嶏紝 鍦� bus_nng 涓� + // bus_query_procs 鑾峰彇鏁版嵁澶辫触, 鏆傛椂鏀圭敤BHQueryProcs + // if (!BHQueryProcs(pbaddr.data(), pbaddr.size(), pbtopic.data(), pbtopic.size(), &rep, &repl, sndto)) { + // return NULL; + // } + + MsgQueryProcReply msgR; + msgR.ParseFromArray(rep, repl); + bus_free(rep, repl); + + *count = msgR.proc_list_size(); + auto procs = (struct cqueryprocs*)calloc(*count, sizeof(struct cqueryprocs)); + for(size_t i = 0; i < *count; i++){ + const auto& p = msgR.proc_list(i); + size_t idl = p.proc().proc_id().size(); + char* id = (char*)calloc(idl+1, 1); + memcpy(id, p.proc().proc_id().data(), idl); + procs[i] = cqueryprocs{ .id = id, .idl = idl, .online = p.online() }; + } + + return procs; +} + //////////////////////////////////////////////////// int bus_client_publish(void* handle, const char* topic, const size_t topicl, const char* data, const size_t size){ MsgPublish pbmsg; pbmsg.set_topic(topic, topicl); pbmsg.set_data(data, size); auto pbstr = pbmsg.SerializeAsString(); - return bus_client_pubmsg(handle, pbstr.data(), pbstr.size()); + return bus_client_publish_msg(handle, pbstr.data(), pbstr.size()); } // test -int bus_client_pubmsg(void* handle, const char* data, const size_t size){ +int bus_client_publish_msg(void* handle, const char* data, const size_t size){ client* cli = ptr(handle); return bus_publish(cli->bus, data, size, 100); } -- Gitblit v1.8.0