From 777333ff834744ac5665fa9abe5ec6373d25cda8 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期三, 01 三月 2023 09:22:30 +0800 Subject: [PATCH] bug fixed ps_sub thread join --- cbhomeclient.cpp | 210 ++++++++++++++++++++++++++++++++++------------------ 1 files changed, 136 insertions(+), 74 deletions(-) diff --git a/cbhomeclient.cpp b/cbhomeclient.cpp index 4de08e7..0963c4b 100644 --- a/cbhomeclient.cpp +++ b/cbhomeclient.cpp @@ -9,7 +9,8 @@ #include "cbhomeclient.h" #include "fixed_q.h" -#include "3dparty/bus_nng/interface_bus_api.h" +#include "3rdparty/bus_nng/interface_bus_api.h" +#include "3rdparty/bus_nng/bn_api.h" #include "bhome_msg.pb.h" @@ -43,7 +44,7 @@ void* bus{nullptr}; cproc* pinfo{nullptr}; ~client(){ - free_proc_info(pinfo); + internal_cproc_free(pinfo); thrd_quit.store(true, memory_order_acq_rel); if (thrd_sub) thrd_sub->join(); @@ -52,7 +53,7 @@ if (sub_q) sub_q->clear(freeMsg); if (readreq_q) readreq_q->clear(freeMsg); - if (bus) bus_cleanup(bus); + bus_cleanup(bus); } }; @@ -74,7 +75,7 @@ } template <class T> Msg msg(T&& t){ Msg m; - msg_helper(make_index_sequence<tuple_size<T>::value>{}, std::forward<T>(t), m); + msg_helper(make_index_sequence<tuple_size<typename decay<T>::type>::value>{}, std::forward<T>(t), m); return m; } @@ -82,7 +83,7 @@ Msg to_bus(client* cli, F&& f, Args&&... args){ Msg mesg; if (std::forward<F>(f)(cli->bus, std::forward<Args>(args)...)) - mesg = std::move(msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...)))); + mesg = msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...))); return mesg; } @@ -96,7 +97,7 @@ tie(d, s) = tp; if (m.ParseFromArray(d, s)) { bus_free(d, s); - msg = std::move(make_tuple(true, std::move(m))); + msg = make_tuple(true, std::move(m)); break; } m.Clear(); @@ -110,24 +111,22 @@ template <size_t... Is, class F, class... Args> MsgCR to_center(client* cli, F&& f, Args&&... args){ MsgCR msg(dummy()); - auto vmsg = std::move(to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...)); - if (!vmsg.empty()) msg = std::move(parse(cli, vmsg.at(0))); + auto vmsg = to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...); + if (!vmsg.empty()) msg = parse(cli, vmsg.at(0)); return msg; } template <class F> -MsgCR to_topic(client* cli, F&& f, const struct cstrarr topic){ +MsgCR to_topic(client* cli, F&& f, char** topic, const size_t count){ MsgCR msg(dummy()); - if (topic.arr && topic.count){ - MsgTopicList tlist; - for(size_t i = 0; i < topic.count; i++) - tlist.add_topic_list(topic.arr[i].str, topic.arr[i].size); - const auto& tpc = tlist.SerializeAsString(); - void* replymsg = NULL; - int replysize = 0; - msg = std::move(to_center<2,3>(cli, std::forward<F>(f), - tpc.data(), tpc.size(), &replymsg, &replysize, sndto)); - } + MsgTopicList tlist; + for(size_t i = 0; i < count; i++) + tlist.add_topic_list(topic[i]); + const auto& tpc = tlist.SerializeAsString(); + void* replymsg = NULL; + int replysize = 0; + msg = to_center<2,3>(cli, std::forward<F>(f), + tpc.data(), tpc.size(), &replymsg, &replysize, sndto); return msg; } @@ -165,45 +164,51 @@ } } -static void registered(client* cli, const creg* rinfo, const bool must_reg=true){ +static void registered(client* cli, const creg* rinfo){ - if (must_reg){ - ProcInfo pinfo; - auto tmp = rinfo->pinfo; - pinfo.set_name(tmp->name.str, tmp->name.size); - pinfo.set_proc_id(tmp->id.str, tmp->id.size); - const auto& reg = pinfo.SerializeAsString(); - - while (!cli->thrd_quit.load(memory_order_acquire)) { - void* replymsg = NULL; - int replysize = 0; - cli->bus = bus_register(reg.data(), reg.size(), &replymsg, &replysize, sndto); - bus_free(replymsg, replysize); - if (cli->bus) break; - } - - // register success start read request thread - cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); })); + ProcInfo pinfo; + auto proc = creg_proc(rinfo); + pinfo.set_name(cproc_name(proc)); + pinfo.set_proc_id(cproc_id(proc)); + const auto& pbproc = pinfo.SerializeAsString(); + while (!cli->thrd_quit.load(memory_order_acquire)) { + void* replymsg = NULL; + int replysize = 0; + cli->bus = bus_register(pbproc.data(), pbproc.size(), &replymsg, &replysize, sndto); + bus_free(replymsg, replysize); + if (cli->bus) break; } + // register success start read request thread + cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); })); // request/reply鍜宲ub topic涓�璧峰鐞� - auto tmparr = cstr_arr_new(rinfo->channel.count + rinfo->topic_pub.count); - auto addarr = [&tmparr](size_t& start, const struct cstrarr* arr){ - for(size_t i = 0; i < arr->count; i++){ - cstr_arr_add(&tmparr, arr->arr[i].str, arr->arr[i].size, start+i); - } - start += arr->count; + // 鍙渶瑕佸皢瀛楃涓叉寚閽堟嫹璐濆氨琛岋紝涓嶉渶鍒涘缓瀛楃涓插唴瀛� + auto shallowMerge = [](char** a1, const size_t c1, char** a2, const size_t c2){ + auto tmp = (char**)malloc((c1 + c2) * sizeof(char*)); + auto dst = tmp; + auto cp2dst = [&dst](char** src, const size_t cnt){ + memcpy(dst, src, cnt * sizeof(char*)); + dst += cnt; + }; + cp2dst(a1, c1); + cp2dst(a2, c2); + return tmp; }; - size_t s = 0; - addarr(s, &rinfo->channel); - addarr(s, &rinfo->topic_pub); - auto tpcmsg = to_topic(cli, bus_register_topics, tmparr); - cstr_arr_free(tmparr); - // auto channelmsg = to_topic(cli, bus_register_topics, rinfo->channel); - // auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub); + size_t tpcc = 0, pubc = 0; + char** tpc = creg_reply_topic(rinfo, &tpcc); + char** pub = creg_pub_topic(rinfo, &pubc); + auto tmparr = shallowMerge(tpc, tpcc, pub, pubc); + auto tpcmsg = to_topic(cli, bus_register_topics, tmparr, tpcc + pubc); + for(size_t i = 0; i < tpcc+pubc; i++){ + printf("======>> proc [%s] topic %lu -> %s\n", cproc_name(proc), i, tmparr[i]); + } + free(tmparr); + // if topic pub/sub[net] exist, register topics - auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub); - auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net); + tpc = creg_sub_topic(rinfo, &tpcc); + auto submsg = to_topic(cli, bus_subscribe_topics, tpc, tpcc); + tpc = creg_subnet_topic(rinfo, &tpcc); + auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, tpc, tpcc); if (get<0>(submsg) && !cli->thrd_sub) cli->thrd_sub.reset(new thread([cli]{ thread_sub(cli); })); @@ -212,21 +217,21 @@ static void unregistered(client* cli){ ProcInfo pinfo; - auto tmp = cli->pinfo; - pinfo.set_name(tmp->name.str, tmp->name.size); - pinfo.set_proc_id(tmp->id.str, tmp->id.size); - const auto& reg = pinfo.SerializeAsString(); + auto proc = cli->pinfo; + pinfo.set_name(cproc_name(proc)); + pinfo.set_proc_id(cproc_id(proc)); + const auto& pbproc = pinfo.SerializeAsString(); void* rep; int repl; - to_center<2,3>(cli, bus_unregister, reg.data(), reg.size(), &rep, &repl, sndto); + to_center<2,3>(cli, bus_unregister, pbproc.data(), pbproc.size(), &rep, &repl, sndto); } static inline client* ptr(void* handle){ return static_cast<client*>(handle); } void* bus_client_init(const char* srvid, const size_t sidsize, const creg* rinfo){ client* cli = new client; - cli->pinfo = clone_proc_info(rinfo->pinfo); + cli->pinfo = internal_clone_cproc(creg_proc(rinfo)); auto pred = [cli]{ return cli->thrd_quit.load(memory_order_relaxed); }; const size_t qsize = 5; @@ -244,11 +249,7 @@ delete cli; } -struct csubmsg* bus_client_get_submsg(void* handle){ - client* cli = ptr(handle); - Msg msg = std::move(cli->sub_q->pop()); - if (msg.empty()) return NULL; - +static struct csubmsg* parse_submsg(const Msg& msg){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; @@ -261,12 +262,23 @@ return pmsg; } - -struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){ +struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms){ client* cli = ptr(handle); - Msg msg = std::move(cli->readreq_q->pop()); + Msg msg = cli->sub_q->pop(ms); if (msg.empty()) return NULL; + return parse_submsg(msg); +} + +struct csubmsg* bus_client_get_submsg(void* handle){ + client* cli = ptr(handle); + Msg msg = cli->sub_q->pop(); + if (msg.empty()) return NULL; + + return parse_submsg(msg); +} + +static struct creqmsg* parse_reqmsg(const Msg& msg, void** src){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; @@ -280,14 +292,28 @@ return pmsg; } +struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms){ + client* cli = ptr(handle); + Msg msg = cli->readreq_q->pop(ms); + if (msg.empty()) return NULL; + + return parse_reqmsg(msg, src); +} +struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){ + client* cli = ptr(handle); + Msg msg = cli->readreq_q->pop(); + if (msg.empty()) return NULL; + + return parse_reqmsg(msg, src); +} int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){ void* procid = NULL, *reply = NULL; int pids = 0, replys = 0; - auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0, - msg->msg.str, msg->msg.size, &procid, &pids, - &reply, &replys, sndto)); + auto vmsg = to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0, + msg->msg, msg->msgl, &procid, &pids, + &reply, &replys, sndto); if (!vmsg.empty()){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; @@ -307,18 +333,54 @@ return false; } -int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg){ +int bus_client_reply(void* handle, void* src, const struct crepmsg* msg){ MsgRequestTopicReply msgR; auto err = msgR.mutable_errmsg(); err->set_errcode((ErrorCode)msg->errcode); - err->set_errstring(msg->errmsg.str, msg->errmsg.size); + err->set_errstring(msg->errmsg, msg->errmsgl); - msgR.set_data(msg->data.str, msg->data.size); + msgR.set_data(msg->data, msg->datal); auto pbstr = msgR.SerializeAsString(); auto cli = ptr(handle); return bus_send_reply(cli->bus, src, pbstr.data(), pbstr.size()); +} + +struct cqueryprocs* bus_client_query_procs(void* handle, size_t* count){ + BHAddress addr; + const auto& pbaddr = addr.SerializeAsString(); + MsgQueryProc topic; + const auto& pbtopic = topic.SerializeAsString(); + + void* rep = NULL; + int repl = 0; + auto msg = to_bus<4,5>(ptr(handle), bus_query_procs, pbaddr.data(), pbaddr.size(), + pbtopic.data(), pbtopic.size(), &rep, &repl, sndto); + + if (msg.empty()) return NULL; + + // bug 宸蹭慨澶嶏紝 鍦� bus_nng 涓� + // bus_query_procs 鑾峰彇鏁版嵁澶辫触, 鏆傛椂鏀圭敤BHQueryProcs + // if (!BHQueryProcs(pbaddr.data(), pbaddr.size(), pbtopic.data(), pbtopic.size(), &rep, &repl, sndto)) { + // return NULL; + // } + + MsgQueryProcReply msgR; + msgR.ParseFromArray(rep, repl); + bus_free(rep, repl); + + *count = msgR.proc_list_size(); + auto procs = (struct cqueryprocs*)calloc(*count, sizeof(struct cqueryprocs)); + for(size_t i = 0; i < *count; i++){ + const auto& p = msgR.proc_list(i); + size_t idl = p.proc().proc_id().size(); + char* id = (char*)calloc(idl+1, 1); + memcpy(id, p.proc().proc_id().data(), idl); + procs[i] = cqueryprocs{ .id = id, .idl = idl, .online = p.online() }; + } + + return procs; } //////////////////////////////////////////////////// @@ -327,11 +389,11 @@ pbmsg.set_topic(topic, topicl); pbmsg.set_data(data, size); auto pbstr = pbmsg.SerializeAsString(); - return bus_client_pubmsg(handle, pbstr.data(), pbstr.size()); + return bus_client_publish_msg(handle, pbstr.data(), pbstr.size()); } // test -int bus_client_pubmsg(void* handle, const char* data, const size_t size){ +int bus_client_publish_msg(void* handle, const char* data, const size_t size){ client* cli = ptr(handle); return bus_publish(cli->bus, data, size, 100); } -- Gitblit v1.8.0