From f600bb176c1d2f0eeb5180637bdc09605b3d21bd Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期三, 14 十二月 2022 14:58:28 +0800 Subject: [PATCH] restruct for easy use v1.1 --- cbhomeclient.cpp | 84 ++++++++++++++++++++++------------------- 1 files changed, 45 insertions(+), 39 deletions(-) diff --git a/cbhomeclient.cpp b/cbhomeclient.cpp index e4c80aa..60140b2 100644 --- a/cbhomeclient.cpp +++ b/cbhomeclient.cpp @@ -43,7 +43,7 @@ void* bus{nullptr}; cproc* pinfo{nullptr}; ~client(){ - free_proc_info(pinfo); + internal_cproc_free(pinfo); thrd_quit.store(true, memory_order_acq_rel); if (thrd_sub) thrd_sub->join(); @@ -52,7 +52,7 @@ if (sub_q) sub_q->clear(freeMsg); if (readreq_q) readreq_q->clear(freeMsg); - if (bus) bus_cleanup(bus); + bus_cleanup(bus); } }; @@ -116,18 +116,16 @@ } template <class F> -MsgCR to_topic(client* cli, F&& f, const struct cstrarr& topic){ +MsgCR to_topic(client* cli, F&& f, char** topic, const size_t count){ MsgCR msg(dummy()); - if (topic.arr && topic.count){ - MsgTopicList tlist; - for(size_t i = 0; i < topic.count; i++) - tlist.add_topic_list(topic.arr[i].str, topic.arr[i].size); - const auto& tpc = tlist.SerializeAsString(); - void* replymsg = NULL; - int replysize = 0; - msg = std::move(to_center<2,3>(cli, std::forward<F>(f), - tpc.data(), tpc.size(), &replymsg, &replysize, sndto)); - } + MsgTopicList tlist; + for(size_t i = 0; i < count; i++) + tlist.add_topic_list(topic[i]); + const auto& tpc = tlist.SerializeAsString(); + void* replymsg = NULL; + int replysize = 0; + msg = std::move(to_center<2,3>(cli, std::forward<F>(f), + tpc.data(), tpc.size(), &replymsg, &replysize, sndto)); return msg; } @@ -168,14 +166,14 @@ static void registered(client* cli, const creg* rinfo){ ProcInfo pinfo; - auto tmp = rinfo->pinfo; - pinfo.set_name(tmp->name.str, tmp->name.size); - pinfo.set_proc_id(tmp->id.str, tmp->id.size); - const auto& reg = pinfo.SerializeAsString(); + auto proc = creg_proc(rinfo); + pinfo.set_name(cproc_name(proc)); + pinfo.set_proc_id(cproc_id(proc)); + const auto& pbproc = pinfo.SerializeAsString(); while (!cli->thrd_quit.load(memory_order_acquire)) { void* replymsg = NULL; int replysize = 0; - cli->bus = bus_register(reg.data(), reg.size(), &replymsg, &replysize, sndto); + cli->bus = bus_register(pbproc.data(), pbproc.size(), &replymsg, &replysize, sndto); bus_free(replymsg, replysize); if (cli->bus) break; } @@ -184,24 +182,32 @@ // request/reply鍜宲ub topic涓�璧峰鐞� // 鍙渶瑕佸皢瀛楃涓叉寚閽堟嫹璐濆氨琛岋紝涓嶉渶鍒涘缓瀛楃涓插唴瀛� - auto shallowMerge = [](const cstrarr& arr1, const cstrarr& arr2){ - auto tmp = cstr_arr_new(arr1.count + arr2.count); - auto dst = tmp.arr; - auto cp2dst = [&dst](const cstr* src, const size_t cnt){ - memcpy(dst, src, cnt * sizeof(cstr)); + auto shallowMerge = [](char** a1, const size_t c1, char** a2, const size_t c2){ + auto tmp = (char**)malloc((c1 + c2) * sizeof(char*)); + auto dst = tmp; + auto cp2dst = [&dst](char** src, const size_t cnt){ + memcpy(dst, src, cnt * sizeof(char*)); dst += cnt; }; - cp2dst(arr1.arr, arr1.count); - cp2dst(arr2.arr, arr2.count); + cp2dst(a1, c1); + cp2dst(a2, c2); return tmp; }; - auto tmparr = shallowMerge(rinfo->channel, rinfo->topic_pub); - auto tpcmsg = to_topic(cli, bus_register_topics, tmparr); - free(tmparr.arr); + size_t tpcc = 0, pubc = 0; + char** tpc = creg_rep_topic(rinfo, &tpcc); + char** pub = creg_pub_topic(rinfo, &pubc); + auto tmparr = shallowMerge(tpc, tpcc, pub, pubc); + auto tpcmsg = to_topic(cli, bus_register_topics, tmparr, tpcc + pubc); + for(size_t i = 0; i < tpcc+pubc; i++){ + printf("======>> proc [%s] topic %lu -> %s\n", cproc_name(proc), i, tmparr[i]); + } + free(tmparr); // if topic pub/sub[net] exist, register topics - auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub); - auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net); + tpc = creg_sub_topic(rinfo, &tpcc); + auto submsg = to_topic(cli, bus_subscribe_topics, tpc, tpcc); + tpc = creg_subnet_topic(rinfo, &tpcc); + auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, tpc, tpcc); if (get<0>(submsg) && !cli->thrd_sub) cli->thrd_sub.reset(new thread([cli]{ thread_sub(cli); })); @@ -210,21 +216,21 @@ static void unregistered(client* cli){ ProcInfo pinfo; - auto tmp = cli->pinfo; - pinfo.set_name(tmp->name.str, tmp->name.size); - pinfo.set_proc_id(tmp->id.str, tmp->id.size); - const auto& reg = pinfo.SerializeAsString(); + auto proc = cli->pinfo; + pinfo.set_name(cproc_name(proc)); + pinfo.set_proc_id(cproc_id(proc)); + const auto& pbproc = pinfo.SerializeAsString(); void* rep; int repl; - to_center<2,3>(cli, bus_unregister, reg.data(), reg.size(), &rep, &repl, sndto); + to_center<2,3>(cli, bus_unregister, pbproc.data(), pbproc.size(), &rep, &repl, sndto); } static inline client* ptr(void* handle){ return static_cast<client*>(handle); } void* bus_client_init(const char* srvid, const size_t sidsize, const creg* rinfo){ client* cli = new client; - cli->pinfo = clone_proc_info(rinfo->pinfo); + cli->pinfo = internal_clone_cproc(creg_proc(rinfo)); auto pred = [cli]{ return cli->thrd_quit.load(memory_order_relaxed); }; const size_t qsize = 5; @@ -284,7 +290,7 @@ void* procid = NULL, *reply = NULL; int pids = 0, replys = 0; auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0, - msg->msg.str, msg->msg.size, &procid, &pids, + msg->msg, msg->msgl, &procid, &pids, &reply, &replys, sndto)); if (!vmsg.empty()){ void* procid = NULL, *data = NULL; @@ -310,9 +316,9 @@ MsgRequestTopicReply msgR; auto err = msgR.mutable_errmsg(); err->set_errcode((ErrorCode)msg->errcode); - err->set_errstring(msg->errmsg.str, msg->errmsg.size); + err->set_errstring(msg->errmsg, msg->errmsgl); - msgR.set_data(msg->data.str, msg->data.size); + msgR.set_data(msg->data, msg->datal); auto pbstr = msgR.SerializeAsString(); auto cli = ptr(handle); -- Gitblit v1.8.0