From aa9df8bbedaa240e9225c965ea89766fb86888fb Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 13 十二月 2022 16:11:00 +0800
Subject: [PATCH] robust
---
cbhomeclient.cpp | 88 ++++++++++++++++++++++++++++++++++---------
1 files changed, 69 insertions(+), 19 deletions(-)
diff --git a/cbhomeclient.cpp b/cbhomeclient.cpp
index 1d213bb..4de08e7 100644
--- a/cbhomeclient.cpp
+++ b/cbhomeclient.cpp
@@ -57,8 +57,7 @@
};
///////////////////////////////////////////////////////////
-
-template <size_t... I, class T>
+template <size_t... I, class T, typename enable_if<(sizeof...(I) > 0)>::type* = nullptr>
auto crop(T&& t) -> decltype(make_tuple(get<I>(std::forward<T>(t))...)){
return make_tuple(get<I>(std::forward<T>(t))...);
}
@@ -79,7 +78,7 @@
return m;
}
-template <size_t... Is, class F, class... Args>
+template <size_t... Is, class F, class... Args, typename enable_if<(sizeof...(Args) > 0)>::type* = nullptr>
Msg to_bus(client* cli, F&& f, Args&&... args){
Msg mesg;
if (std::forward<F>(f)(cli->bus, std::forward<Args>(args)...))
@@ -87,8 +86,7 @@
return mesg;
}
-MsgCR parse(client*, const tuple<>&){ return dummy();}
-template <class... Args>
+template <class... Args, typename enable_if<sizeof...(Args) == 2>::type* = nullptr>
MsgCR parse(client* cli, const tuple<Args...>& tp){
MsgCR msg(dummy());
MsgCommonReply m;
@@ -113,17 +111,16 @@
MsgCR to_center(client* cli, F&& f, Args&&... args){
MsgCR msg(dummy());
auto vmsg = std::move(to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...));
- if (!vmsg.empty())
- msg = std::move(parse(cli, vmsg.at(0)));
+ if (!vmsg.empty()) msg = std::move(parse(cli, vmsg.at(0)));
return msg;
}
template <class F>
MsgCR to_topic(client* cli, F&& f, const struct cstrarr topic){
MsgCR msg(dummy());
- if (topic.arr && topic.size){
+ if (topic.arr && topic.count){
MsgTopicList tlist;
- for(size_t i = 0; i < topic.size; i++)
+ for(size_t i = 0; i < topic.count; i++)
tlist.add_topic_list(topic.arr[i].str, topic.arr[i].size);
const auto& tpc = tlist.SerializeAsString();
void* replymsg = NULL;
@@ -189,8 +186,22 @@
cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); }));
}
+ // request/reply鍜宲ub topic涓�璧峰鐞�
+ auto tmparr = cstr_arr_new(rinfo->channel.count + rinfo->topic_pub.count);
+ auto addarr = [&tmparr](size_t& start, const struct cstrarr* arr){
+ for(size_t i = 0; i < arr->count; i++){
+ cstr_arr_add(&tmparr, arr->arr[i].str, arr->arr[i].size, start+i);
+ }
+ start += arr->count;
+ };
+ size_t s = 0;
+ addarr(s, &rinfo->channel);
+ addarr(s, &rinfo->topic_pub);
+ auto tpcmsg = to_topic(cli, bus_register_topics, tmparr);
+ cstr_arr_free(tmparr);
+ // auto channelmsg = to_topic(cli, bus_register_topics, rinfo->channel);
+ // auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub);
// if topic pub/sub[net] exist, register topics
- auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub);
auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub);
auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net);
@@ -251,37 +262,76 @@
return pmsg;
}
-struct creqmsg* bus_client_get_reqmsg(void* handle){
+struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
client* cli = ptr(handle);
Msg msg = std::move(cli->readreq_q->pop());
if (msg.empty()) return NULL;
- void* procid = NULL, *data = NULL, *src = NULL;
+ void* procid = NULL, *data = NULL;
int pids = 0, size = 0;
tie(procid, pids) = msg.at(0);
tie(data, size) = msg.at(1);
- tie(src, ignore) = msg.at(2);
+ tie(*src, ignore) = msg.at(2);
- auto pmsg = to_reqmsg((const char*)procid, pids, (const char *)data, size, src);
+ auto pmsg = to_reqmsg((const char*)procid, pids, (const char *)data, size);
bus_free(procid, pids);
bus_free(data, size);
return pmsg;
}
-int bus_client_reply_reqmsg(void* handle, struct crepmsg* msg){
+int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){
+ void* procid = NULL, *reply = NULL;
+ int pids = 0, replys = 0;
+ auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
+ msg->msg.str, msg->msg.size, &procid, &pids,
+ &reply, &replys, sndto));
+ if (!vmsg.empty()){
+ void* procid = NULL, *data = NULL;
+ int pids = 0, size = 0;
+ tie(procid, pids) = vmsg.at(0);
+ bus_free(procid, pids);
+ tie(data, size) = vmsg.at(1);
+ MsgRequestTopicReply msgRT;
+ auto pb = msgRT.ParseFromArray(reply, replys);
+ bus_free(reply, replys);
+ if (!pb) return false;
- MsgRequestTopicReply msgR;
+ *repmsg = make_reply_msg(msgRT.errmsg().errcode(),
+ msgRT.errmsg().errstring().data(), msgRT.errmsg().errstring().size(),
+ msgRT.data().data(), msgRT.data().size());
+ return true;
+ }
+ return false;
}
+int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg){
+ MsgRequestTopicReply msgR;
+ auto err = msgR.mutable_errmsg();
+ err->set_errcode((ErrorCode)msg->errcode);
+ err->set_errstring(msg->errmsg.str, msg->errmsg.size);
+
+ msgR.set_data(msg->data.str, msg->data.size);
+ auto pbstr = msgR.SerializeAsString();
+
+ auto cli = ptr(handle);
+ return bus_send_reply(cli->bus, src, pbstr.data(), pbstr.size());
+}
////////////////////////////////////////////////////
+int bus_client_publish(void* handle, const char* topic, const size_t topicl, const char* data, const size_t size){
+ MsgPublish pbmsg;
+ pbmsg.set_topic(topic, topicl);
+ pbmsg.set_data(data, size);
+ auto pbstr = pbmsg.SerializeAsString();
+ return bus_client_pubmsg(handle, pbstr.data(), pbstr.size());
+}
+
// test
-int bus_client_pubmsg(void* handle, void* data, const size_t size){
+int bus_client_pubmsg(void* handle, const char* data, const size_t size){
client* cli = ptr(handle);
- bus_publish(cli->bus, data, size, 100);
- return 0;
+ return bus_publish(cli->bus, data, size, 100);
}
--
Gitblit v1.8.0