From ecf23f882ca1b8aaf0863980fc4781c515da1695 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 12 十二月 2022 16:49:03 +0800 Subject: [PATCH] add req rep --- cbhomeclient.cpp | 64 ++++++++++++++++++++++++++++---- 1 files changed, 56 insertions(+), 8 deletions(-) diff --git a/cbhomeclient.cpp b/cbhomeclient.cpp index 1d213bb..9c8701e 100644 --- a/cbhomeclient.cpp +++ b/cbhomeclient.cpp @@ -121,9 +121,9 @@ template <class F> MsgCR to_topic(client* cli, F&& f, const struct cstrarr topic){ MsgCR msg(dummy()); - if (topic.arr && topic.size){ + if (topic.arr && topic.count){ MsgTopicList tlist; - for(size_t i = 0; i < topic.size; i++) + for(size_t i = 0; i < topic.count; i++) tlist.add_topic_list(topic.arr[i].str, topic.arr[i].size); const auto& tpc = tlist.SerializeAsString(); void* replymsg = NULL; @@ -189,8 +189,22 @@ cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); })); } + // request/reply鍜宲ub topic涓�璧峰鐞� + auto tmparr = cstr_arr_new(rinfo->channel.count + rinfo->topic_pub.count); + auto addarr = [&tmparr](size_t& start, const struct cstrarr* arr){ + for(size_t i = 0; i < arr->count; i++){ + cstr_arr_add(&tmparr, arr->arr[i].str, arr->arr[i].size, start+i); + } + start += arr->count; + }; + size_t s = 0; + addarr(s, &rinfo->channel); + addarr(s, &rinfo->topic_pub); + auto tpcmsg = to_topic(cli, bus_register_topics, tmparr); + cstr_arr_free(tmparr); + // auto channelmsg = to_topic(cli, bus_register_topics, rinfo->channel); + // auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub); // if topic pub/sub[net] exist, register topics - auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub); auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub); auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net); @@ -251,29 +265,63 @@ return pmsg; } -struct creqmsg* bus_client_get_reqmsg(void* handle){ +struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){ client* cli = ptr(handle); Msg msg = std::move(cli->readreq_q->pop()); if (msg.empty()) return NULL; - void* procid = NULL, *data = NULL, *src = NULL; + void* procid = NULL, *data = NULL; int pids = 0, size = 0; tie(procid, pids) = msg.at(0); tie(data, size) = msg.at(1); - tie(src, ignore) = msg.at(2); + tie(*src, ignore) = msg.at(2); - auto pmsg = to_reqmsg((const char*)procid, pids, (const char *)data, size, src); + auto pmsg = to_reqmsg((const char*)procid, pids, (const char *)data, size); bus_free(procid, pids); bus_free(data, size); return pmsg; } -int bus_client_reply_reqmsg(void* handle, struct crepmsg* msg){ +int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){ + void* procid = NULL, *reply = NULL; + int pids = 0, replys = 0; + auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0, + msg->msg.str, msg->msg.size, &procid, &pids, + &reply, &replys, sndto)); + if (!vmsg.empty()){ + void* procid = NULL, *data = NULL; + int pids = 0, size = 0; + tie(procid, pids) = vmsg.at(0); + bus_free(procid, pids); + tie(data, size) = vmsg.at(1); + MsgRequestTopicReply msgRT; + auto pb = msgRT.ParseFromArray(reply, replys); + bus_free(reply, replys); + if (!pb) return false; + + *repmsg = make_reply_msg(msgRT.errmsg().errcode(), + msgRT.errmsg().errstring().data(), msgRT.errmsg().errstring().size(), + msgRT.data().data(), msgRT.data().size()); + return true; + } + return false; +} + +int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg){ MsgRequestTopicReply msgR; + auto err = msgR.mutable_errmsg(); + err->set_errcode((ErrorCode)msg->errcode); + err->set_errstring(msg->errmsg.str, msg->errmsg.size); + + msgR.set_data(msg->data.str, msg->data.size); + auto pbstr = msgR.SerializeAsString(); + + auto cli = ptr(handle); + return bus_send_reply(cli->bus, src, pbstr.data(), pbstr.size()); } -- Gitblit v1.8.0