From 777333ff834744ac5665fa9abe5ec6373d25cda8 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 01 三月 2023 09:22:30 +0800
Subject: [PATCH] bug fixed ps_sub thread join

---
 cbhomeclient.cpp |  210 ++++++++++++++++++++++++++++++++++------------------
 1 files changed, 136 insertions(+), 74 deletions(-)

diff --git a/cbhomeclient.cpp b/cbhomeclient.cpp
index 4de08e7..0963c4b 100644
--- a/cbhomeclient.cpp
+++ b/cbhomeclient.cpp
@@ -9,7 +9,8 @@
 #include "cbhomeclient.h"
 #include "fixed_q.h"
 
-#include "3dparty/bus_nng/interface_bus_api.h"
+#include "3rdparty/bus_nng/interface_bus_api.h"
+#include "3rdparty/bus_nng/bn_api.h"
 
 #include "bhome_msg.pb.h"
 
@@ -43,7 +44,7 @@
     void*                       bus{nullptr};
     cproc*                      pinfo{nullptr};
     ~client(){
-        free_proc_info(pinfo);
+        internal_cproc_free(pinfo);
 
         thrd_quit.store(true, memory_order_acq_rel);
         if (thrd_sub) thrd_sub->join();
@@ -52,7 +53,7 @@
         if (sub_q) sub_q->clear(freeMsg);
         if (readreq_q) readreq_q->clear(freeMsg);
 
-        if (bus) bus_cleanup(bus);
+        bus_cleanup(bus);
     }
 };
 
@@ -74,7 +75,7 @@
 }
 template <class T> Msg msg(T&& t){
     Msg m;
-    msg_helper(make_index_sequence<tuple_size<T>::value>{}, std::forward<T>(t), m);
+    msg_helper(make_index_sequence<tuple_size<typename decay<T>::type>::value>{}, std::forward<T>(t), m);
     return m;
 }
 
@@ -82,7 +83,7 @@
 Msg to_bus(client* cli, F&& f, Args&&... args){
     Msg mesg;
     if (std::forward<F>(f)(cli->bus, std::forward<Args>(args)...))
-        mesg = std::move(msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...))));
+        mesg = msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...)));
     return mesg;
 }
 
@@ -96,7 +97,7 @@
         tie(d, s) = tp;
         if (m.ParseFromArray(d, s)) {
             bus_free(d, s);
-            msg = std::move(make_tuple(true, std::move(m)));
+            msg = make_tuple(true, std::move(m));
             break;
         }
         m.Clear();
@@ -110,24 +111,22 @@
 template <size_t... Is, class F, class... Args>
 MsgCR to_center(client* cli, F&& f, Args&&... args){
     MsgCR msg(dummy());
-    auto vmsg = std::move(to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...));
-    if (!vmsg.empty()) msg = std::move(parse(cli, vmsg.at(0)));
+    auto vmsg = to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...);
+    if (!vmsg.empty()) msg = parse(cli, vmsg.at(0));
     return msg;
 }
 
 template <class F>
-MsgCR to_topic(client* cli, F&& f, const struct cstrarr topic){
+MsgCR to_topic(client* cli, F&& f, char** topic, const size_t count){
     MsgCR msg(dummy());
-    if (topic.arr && topic.count){
-        MsgTopicList tlist;
-        for(size_t i = 0; i < topic.count; i++)
-            tlist.add_topic_list(topic.arr[i].str, topic.arr[i].size);
-        const auto& tpc = tlist.SerializeAsString();
-        void* replymsg = NULL;
-        int replysize = 0;
-        msg = std::move(to_center<2,3>(cli, std::forward<F>(f),
-            tpc.data(), tpc.size(), &replymsg, &replysize, sndto));
-    }
+    MsgTopicList tlist;
+    for(size_t i = 0; i < count; i++)
+        tlist.add_topic_list(topic[i]);
+    const auto& tpc = tlist.SerializeAsString();
+    void* replymsg = NULL;
+    int replysize = 0;
+    msg = to_center<2,3>(cli, std::forward<F>(f),
+        tpc.data(), tpc.size(), &replymsg, &replysize, sndto);
     return msg;
 }
 
@@ -165,45 +164,51 @@
     }
 }
 
-static void registered(client* cli, const creg* rinfo, const bool must_reg=true){
+static void registered(client* cli, const creg* rinfo){
 
-    if (must_reg){
-        ProcInfo pinfo;
-        auto tmp = rinfo->pinfo;
-        pinfo.set_name(tmp->name.str, tmp->name.size);
-        pinfo.set_proc_id(tmp->id.str, tmp->id.size);
-        const auto& reg = pinfo.SerializeAsString();
-
-        while (!cli->thrd_quit.load(memory_order_acquire)) {
-            void* replymsg = NULL;
-            int replysize = 0;
-            cli->bus = bus_register(reg.data(), reg.size(), &replymsg, &replysize, sndto);
-            bus_free(replymsg, replysize);
-            if (cli->bus) break;
-        }
-
-        // register success start read request thread
-        cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); }));
+    ProcInfo pinfo;
+    auto proc = creg_proc(rinfo);
+    pinfo.set_name(cproc_name(proc));
+    pinfo.set_proc_id(cproc_id(proc));
+    const auto& pbproc = pinfo.SerializeAsString();
+    while (!cli->thrd_quit.load(memory_order_acquire)) {
+        void* replymsg = NULL;
+        int replysize = 0;
+        cli->bus = bus_register(pbproc.data(), pbproc.size(), &replymsg, &replysize, sndto);
+        bus_free(replymsg, replysize);
+        if (cli->bus) break;
     }
+    // register success start read request thread
+    cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); }));
 
     // request/reply鍜宲ub topic涓�璧峰鐞�
-    auto tmparr = cstr_arr_new(rinfo->channel.count + rinfo->topic_pub.count);
-    auto addarr = [&tmparr](size_t& start, const struct cstrarr* arr){
-        for(size_t i = 0; i < arr->count; i++){
-            cstr_arr_add(&tmparr, arr->arr[i].str, arr->arr[i].size, start+i);
-        }
-        start += arr->count;
+    // 鍙渶瑕佸皢瀛楃涓叉寚閽堟嫹璐濆氨琛岋紝涓嶉渶鍒涘缓瀛楃涓插唴瀛�
+    auto shallowMerge = [](char** a1, const size_t c1, char** a2, const size_t c2){
+        auto tmp = (char**)malloc((c1 + c2) * sizeof(char*));
+        auto dst = tmp;
+        auto cp2dst = [&dst](char** src, const size_t cnt){
+            memcpy(dst, src, cnt * sizeof(char*));
+            dst += cnt;
+        };
+        cp2dst(a1, c1);
+        cp2dst(a2, c2);
+        return tmp;
     };
-    size_t s = 0;
-    addarr(s, &rinfo->channel);
-    addarr(s, &rinfo->topic_pub);
-    auto tpcmsg = to_topic(cli, bus_register_topics, tmparr);
-    cstr_arr_free(tmparr);
-    // auto channelmsg = to_topic(cli, bus_register_topics, rinfo->channel);
-    // auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub);
+    size_t tpcc = 0, pubc = 0;
+    char** tpc = creg_reply_topic(rinfo, &tpcc);
+    char** pub = creg_pub_topic(rinfo, &pubc);
+    auto tmparr = shallowMerge(tpc, tpcc, pub, pubc);
+    auto tpcmsg = to_topic(cli, bus_register_topics, tmparr, tpcc + pubc);
+    for(size_t i = 0; i < tpcc+pubc; i++){
+        printf("======>> proc [%s] topic %lu -> %s\n", cproc_name(proc), i, tmparr[i]);
+    }
+    free(tmparr);
+
     // if topic pub/sub[net] exist, register topics
-    auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub);
-    auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net);
+    tpc = creg_sub_topic(rinfo, &tpcc);
+    auto submsg = to_topic(cli, bus_subscribe_topics, tpc, tpcc);
+    tpc = creg_subnet_topic(rinfo, &tpcc);
+    auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, tpc, tpcc);
 
     if (get<0>(submsg) && !cli->thrd_sub)
         cli->thrd_sub.reset(new thread([cli]{ thread_sub(cli); }));
@@ -212,21 +217,21 @@
 
 static void unregistered(client* cli){
     ProcInfo pinfo;
-    auto tmp = cli->pinfo;
-    pinfo.set_name(tmp->name.str, tmp->name.size);
-    pinfo.set_proc_id(tmp->id.str, tmp->id.size);
-    const auto& reg = pinfo.SerializeAsString();
+    auto proc = cli->pinfo;
+    pinfo.set_name(cproc_name(proc));
+    pinfo.set_proc_id(cproc_id(proc));
+    const auto& pbproc = pinfo.SerializeAsString();
 
     void* rep;
     int repl;
-    to_center<2,3>(cli, bus_unregister, reg.data(), reg.size(), &rep, &repl, sndto);
+    to_center<2,3>(cli, bus_unregister, pbproc.data(), pbproc.size(), &rep, &repl, sndto);
 }
 
 static inline client* ptr(void* handle){ return static_cast<client*>(handle); }
 
 void* bus_client_init(const char* srvid, const size_t sidsize, const creg* rinfo){
     client* cli = new client;
-    cli->pinfo = clone_proc_info(rinfo->pinfo);
+    cli->pinfo = internal_clone_cproc(creg_proc(rinfo));
 
     auto pred = [cli]{ return cli->thrd_quit.load(memory_order_relaxed); };
     const size_t qsize = 5;
@@ -244,11 +249,7 @@
     delete cli;
 }
 
-struct csubmsg* bus_client_get_submsg(void* handle){
-    client* cli = ptr(handle);
-    Msg msg = std::move(cli->sub_q->pop());
-    if (msg.empty()) return NULL;
-
+static struct csubmsg* parse_submsg(const Msg& msg){
     void* procid = NULL, *data = NULL;
     int pids = 0, size = 0;
 
@@ -261,12 +262,23 @@
 
     return pmsg;
 }
-
-struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
+struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms){
     client* cli = ptr(handle);
-    Msg msg = std::move(cli->readreq_q->pop());
+    Msg msg = cli->sub_q->pop(ms);
     if (msg.empty()) return NULL;
 
+    return parse_submsg(msg);
+}
+
+struct csubmsg* bus_client_get_submsg(void* handle){
+    client* cli = ptr(handle);
+    Msg msg = cli->sub_q->pop();
+    if (msg.empty()) return NULL;
+
+    return parse_submsg(msg);
+}
+
+static struct creqmsg* parse_reqmsg(const Msg& msg, void** src){
     void* procid = NULL, *data = NULL;
     int pids = 0, size = 0;
 
@@ -280,14 +292,28 @@
 
     return pmsg;
 }
+struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms){
+    client* cli = ptr(handle);
+    Msg msg = cli->readreq_q->pop(ms);
+    if (msg.empty()) return NULL;
+
+    return parse_reqmsg(msg, src);
+}
+struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
+    client* cli = ptr(handle);
+    Msg msg = cli->readreq_q->pop();
+    if (msg.empty()) return NULL;
+
+    return parse_reqmsg(msg, src);
+}
 
 int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){
 
     void* procid = NULL, *reply = NULL;
     int pids = 0, replys = 0;
-    auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
-                    msg->msg.str, msg->msg.size, &procid, &pids,
-                    &reply, &replys, sndto));
+    auto vmsg = to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
+                    msg->msg, msg->msgl, &procid, &pids,
+                    &reply, &replys, sndto);
     if (!vmsg.empty()){
         void* procid = NULL, *data = NULL;
         int pids = 0, size = 0;
@@ -307,18 +333,54 @@
     return false;
 }
 
-int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg){
+int bus_client_reply(void* handle, void* src, const struct crepmsg* msg){
 
     MsgRequestTopicReply msgR;
     auto err = msgR.mutable_errmsg();
     err->set_errcode((ErrorCode)msg->errcode);
-    err->set_errstring(msg->errmsg.str, msg->errmsg.size);
+    err->set_errstring(msg->errmsg, msg->errmsgl);
 
-    msgR.set_data(msg->data.str, msg->data.size);
+    msgR.set_data(msg->data, msg->datal);
     auto pbstr = msgR.SerializeAsString();
 
     auto cli = ptr(handle);
     return bus_send_reply(cli->bus, src, pbstr.data(), pbstr.size());
+}
+
+struct cqueryprocs* bus_client_query_procs(void* handle, size_t* count){
+    BHAddress addr;
+    const auto& pbaddr = addr.SerializeAsString();
+    MsgQueryProc topic;
+    const auto& pbtopic = topic.SerializeAsString();
+
+    void* rep = NULL;
+    int repl = 0;
+    auto msg = to_bus<4,5>(ptr(handle), bus_query_procs, pbaddr.data(), pbaddr.size(),
+       pbtopic.data(), pbtopic.size(), &rep, &repl, sndto);
+
+    if (msg.empty()) return NULL;
+
+    // bug 宸蹭慨澶嶏紝 鍦� bus_nng 涓�
+    // bus_query_procs 鑾峰彇鏁版嵁澶辫触, 鏆傛椂鏀圭敤BHQueryProcs
+    // if (!BHQueryProcs(pbaddr.data(), pbaddr.size(), pbtopic.data(), pbtopic.size(), &rep, &repl, sndto)) {
+    //     return NULL;
+    // }
+
+    MsgQueryProcReply msgR;
+    msgR.ParseFromArray(rep, repl);
+    bus_free(rep, repl);
+
+    *count = msgR.proc_list_size();
+    auto procs = (struct cqueryprocs*)calloc(*count, sizeof(struct cqueryprocs));
+    for(size_t i = 0; i < *count; i++){
+        const auto& p = msgR.proc_list(i);
+        size_t idl = p.proc().proc_id().size();
+        char* id = (char*)calloc(idl+1, 1);
+        memcpy(id, p.proc().proc_id().data(), idl);
+        procs[i] = cqueryprocs{ .id = id, .idl = idl, .online = p.online() };
+    }
+
+    return procs;
 }
 
 ////////////////////////////////////////////////////
@@ -327,11 +389,11 @@
     pbmsg.set_topic(topic, topicl);
     pbmsg.set_data(data, size);
     auto pbstr = pbmsg.SerializeAsString();
-    return bus_client_pubmsg(handle, pbstr.data(), pbstr.size());
+    return bus_client_publish_msg(handle, pbstr.data(), pbstr.size());
 }
 
 // test
-int bus_client_pubmsg(void* handle, const char* data, const size_t size){
+int bus_client_publish_msg(void* handle, const char* data, const size_t size){
     client* cli = ptr(handle);
     return bus_publish(cli->bus, data, size, 100);
 }

--
Gitblit v1.8.0