#include <memory>
|
#include <thread>
|
#include <atomic>
|
#include <string>
|
#include <cstring>
|
#include <vector>
|
#include <tuple>
|
|
#include "cbhomeclient.h"
|
#include "fixed_q.h"
|
|
#include "3dparty/bus_nng/interface_bus_api.h"
|
|
#include "bhome_msg.pb.h"
|
|
using namespace bhome_msg;
|
using namespace std;
|
|
typedef std::vector<std::tuple<void*, int>> Msg;
|
typedef tuple<bool, MsgCommonReply> MsgCR;
|
static MsgCR dummy(){return MsgCR(false, MsgCommonReply{});}
|
|
// ms
|
const int sndto = 100;
|
const int rcvto = 100;
|
|
static void freeMsg(Msg& msg){
|
for (auto& m : msg){
|
auto tmp = get<0>(m);
|
if (tmp) free(tmp);
|
}
|
}
|
|
struct client{
|
unique_ptr<thread> thrd_sub{nullptr};
|
unique_ptr<fixed_q<Msg>> sub_q{nullptr};
|
|
unique_ptr<thread> thrd_readreq{nullptr};
|
unique_ptr<fixed_q<Msg>> readreq_q{nullptr};
|
|
atomic_bool thrd_quit{false};
|
|
void* bus{nullptr};
|
cproc* pinfo{nullptr};
|
~client(){
|
free_proc_info(pinfo);
|
|
thrd_quit.store(true, memory_order_acq_rel);
|
if (thrd_sub) thrd_sub->join();
|
if (thrd_readreq) thrd_readreq->join();
|
|
if (sub_q) sub_q->clear(freeMsg);
|
if (readreq_q) readreq_q->clear(freeMsg);
|
|
if (bus) bus_cleanup(bus);
|
}
|
};
|
|
///////////////////////////////////////////////////////////
|
template <size_t... I, class T, typename enable_if<(sizeof...(I) > 0)>::type* = nullptr>
|
auto crop(T&& t) -> decltype(make_tuple(get<I>(std::forward<T>(t))...)){
|
return make_tuple(get<I>(std::forward<T>(t))...);
|
}
|
|
template<class T> void msg_helper(index_sequence<>, T&&, Msg&){}
|
template <size_t I, class T>
|
void msg_helper(index_sequence<I>, T&& t, Msg& m){
|
m.emplace_back(make_tuple(*get<I>(std::forward<T>(t)),0));
|
}
|
template <size_t I1, size_t I2, size_t... Is, class T>
|
void msg_helper(index_sequence<I1, I2, Is...>, T&& t, Msg& m){
|
m.emplace_back(make_tuple(*get<I1>(std::forward<T>(t)), *get<I2>(std::forward<T>(t))));
|
msg_helper(index_sequence<Is...>{}, std::forward<T>(t), m);
|
}
|
template <class T> Msg msg(T&& t){
|
Msg m;
|
msg_helper(make_index_sequence<tuple_size<T>::value>{}, std::forward<T>(t), m);
|
return m;
|
}
|
|
template <size_t... Is, class F, class... Args, typename enable_if<(sizeof...(Args) > 0)>::type* = nullptr>
|
Msg to_bus(client* cli, F&& f, Args&&... args){
|
Msg mesg;
|
if (std::forward<F>(f)(cli->bus, std::forward<Args>(args)...))
|
mesg = std::move(msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...))));
|
return mesg;
|
}
|
|
template <class... Args, typename enable_if<sizeof...(Args) == 2>::type* = nullptr>
|
MsgCR parse(client* cli, const tuple<Args...>& tp){
|
MsgCR msg(dummy());
|
MsgCommonReply m;
|
while(!cli->thrd_quit.load(memory_order_acquire)){
|
typename tuple_element<0, tuple<Args...>>::type d;
|
typename tuple_element<1, tuple<Args...>>::type s;
|
tie(d, s) = tp;
|
if (m.ParseFromArray(d, s)) {
|
bus_free(d, s);
|
msg = std::move(make_tuple(true, std::move(m)));
|
break;
|
}
|
m.Clear();
|
this_thread::sleep_for(chrono::milliseconds{sndto});
|
}
|
return msg;
|
}
|
|
/////////////////////////////////////////////////////////
|
|
template <size_t... Is, class F, class... Args>
|
MsgCR to_center(client* cli, F&& f, Args&&... args){
|
MsgCR msg(dummy());
|
auto vmsg = std::move(to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...));
|
if (!vmsg.empty()) msg = std::move(parse(cli, vmsg.at(0)));
|
return msg;
|
}
|
|
template <class F>
|
MsgCR to_topic(client* cli, F&& f, const struct cstrarr topic){
|
MsgCR msg(dummy());
|
if (topic.arr && topic.count){
|
MsgTopicList tlist;
|
for(size_t i = 0; i < topic.count; i++)
|
tlist.add_topic_list(topic.arr[i].str, topic.arr[i].size);
|
const auto& tpc = tlist.SerializeAsString();
|
void* replymsg = NULL;
|
int replysize = 0;
|
msg = std::move(to_center<2,3>(cli, std::forward<F>(f),
|
tpc.data(), tpc.size(), &replymsg, &replysize, sndto));
|
}
|
return msg;
|
}
|
|
static void thread_readreq(client* cli){
|
while (!cli->thrd_quit.load(memory_order_acquire)) {
|
void* procid = NULL;
|
int procidsize = 0;
|
void* req = NULL;
|
int reqsize = 0;
|
void* src = NULL;
|
Msg msg = to_bus<0,1,2,3,4>(cli, bus_recv_request,
|
&procid, &procidsize, &req, &reqsize, &src, rcvto);
|
if (!msg.empty()){
|
// 收到信息,保存
|
// pollcontrol代码应该不需要readreq,先不写逻辑,直接释放
|
// freeMsg(msg);
|
// continue;
|
cli->readreq_q->emplace(std::move(msg));
|
}
|
}
|
}
|
|
static void thread_sub(client* cli){
|
while (!cli->thrd_quit.load(memory_order_acquire)) {
|
void* procid = NULL;
|
int procidsize = 0;
|
void* req = NULL;
|
int reqsize = 0;
|
Msg msg = to_bus<0,1,2,3>(cli, bus_recv_pubmsg,
|
&procid, &procidsize, &req, &reqsize, rcvto);
|
if (!msg.empty()) {
|
cli->sub_q->emplace(std::move(msg));
|
// printf("======>> thread_sub a msg\n");
|
}
|
}
|
}
|
|
static void registered(client* cli, const creg* rinfo, const bool must_reg=true){
|
|
if (must_reg){
|
ProcInfo pinfo;
|
auto tmp = rinfo->pinfo;
|
pinfo.set_name(tmp->name.str, tmp->name.size);
|
pinfo.set_proc_id(tmp->id.str, tmp->id.size);
|
const auto& reg = pinfo.SerializeAsString();
|
|
while (!cli->thrd_quit.load(memory_order_acquire)) {
|
void* replymsg = NULL;
|
int replysize = 0;
|
cli->bus = bus_register(reg.data(), reg.size(), &replymsg, &replysize, sndto);
|
bus_free(replymsg, replysize);
|
if (cli->bus) break;
|
}
|
|
// register success start read request thread
|
cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); }));
|
}
|
|
// request/reply和pub topic一起处理
|
auto tmparr = cstr_arr_new(rinfo->channel.count + rinfo->topic_pub.count);
|
auto addarr = [&tmparr](size_t& start, const struct cstrarr* arr){
|
for(size_t i = 0; i < arr->count; i++){
|
cstr_arr_add(&tmparr, arr->arr[i].str, arr->arr[i].size, start+i);
|
}
|
start += arr->count;
|
};
|
size_t s = 0;
|
addarr(s, &rinfo->channel);
|
addarr(s, &rinfo->topic_pub);
|
auto tpcmsg = to_topic(cli, bus_register_topics, tmparr);
|
cstr_arr_free(tmparr);
|
// auto channelmsg = to_topic(cli, bus_register_topics, rinfo->channel);
|
// auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub);
|
// if topic pub/sub[net] exist, register topics
|
auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub);
|
auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net);
|
|
if (get<0>(submsg) && !cli->thrd_sub)
|
cli->thrd_sub.reset(new thread([cli]{ thread_sub(cli); }));
|
|
}
|
|
static void unregistered(client* cli){
|
ProcInfo pinfo;
|
auto tmp = cli->pinfo;
|
pinfo.set_name(tmp->name.str, tmp->name.size);
|
pinfo.set_proc_id(tmp->id.str, tmp->id.size);
|
const auto& reg = pinfo.SerializeAsString();
|
|
void* rep;
|
int repl;
|
to_center<2,3>(cli, bus_unregister, reg.data(), reg.size(), &rep, &repl, sndto);
|
}
|
|
static inline client* ptr(void* handle){ return static_cast<client*>(handle); }
|
|
void* bus_client_init(const char* srvid, const size_t sidsize, const creg* rinfo){
|
client* cli = new client;
|
cli->pinfo = clone_proc_info(rinfo->pinfo);
|
|
auto pred = [cli]{ return cli->thrd_quit.load(memory_order_relaxed); };
|
const size_t qsize = 5;
|
cli->sub_q.reset(new fixed_q<Msg>(qsize, pred));
|
cli->readreq_q.reset(new fixed_q<Msg>(qsize, pred));
|
|
registered(cli, rinfo);
|
|
return cli;
|
}
|
|
void bus_client_free(void* handle){
|
client* cli = ptr(handle);
|
unregistered(cli);
|
delete cli;
|
}
|
|
struct csubmsg* bus_client_get_submsg(void* handle){
|
client* cli = ptr(handle);
|
Msg msg = std::move(cli->sub_q->pop());
|
if (msg.empty()) return NULL;
|
|
void* procid = NULL, *data = NULL;
|
int pids = 0, size = 0;
|
|
tie(procid, pids) = msg.at(0);
|
bus_free(procid, pids);
|
|
tie(data, size) = msg.at(1);
|
auto pmsg = to_submsg((const char *)data, size);
|
bus_free(data, size);
|
|
return pmsg;
|
}
|
|
struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
|
client* cli = ptr(handle);
|
Msg msg = std::move(cli->readreq_q->pop());
|
if (msg.empty()) return NULL;
|
|
void* procid = NULL, *data = NULL;
|
int pids = 0, size = 0;
|
|
tie(procid, pids) = msg.at(0);
|
tie(data, size) = msg.at(1);
|
tie(*src, ignore) = msg.at(2);
|
|
auto pmsg = to_reqmsg((const char*)procid, pids, (const char *)data, size);
|
bus_free(procid, pids);
|
bus_free(data, size);
|
|
return pmsg;
|
}
|
|
int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){
|
|
void* procid = NULL, *reply = NULL;
|
int pids = 0, replys = 0;
|
auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
|
msg->msg.str, msg->msg.size, &procid, &pids,
|
&reply, &replys, sndto));
|
if (!vmsg.empty()){
|
void* procid = NULL, *data = NULL;
|
int pids = 0, size = 0;
|
tie(procid, pids) = vmsg.at(0);
|
bus_free(procid, pids);
|
tie(data, size) = vmsg.at(1);
|
MsgRequestTopicReply msgRT;
|
auto pb = msgRT.ParseFromArray(reply, replys);
|
bus_free(reply, replys);
|
if (!pb) return false;
|
|
*repmsg = make_reply_msg(msgRT.errmsg().errcode(),
|
msgRT.errmsg().errstring().data(), msgRT.errmsg().errstring().size(),
|
msgRT.data().data(), msgRT.data().size());
|
return true;
|
}
|
return false;
|
}
|
|
int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg){
|
|
MsgRequestTopicReply msgR;
|
auto err = msgR.mutable_errmsg();
|
err->set_errcode((ErrorCode)msg->errcode);
|
err->set_errstring(msg->errmsg.str, msg->errmsg.size);
|
|
msgR.set_data(msg->data.str, msg->data.size);
|
auto pbstr = msgR.SerializeAsString();
|
|
auto cli = ptr(handle);
|
return bus_send_reply(cli->bus, src, pbstr.data(), pbstr.size());
|
}
|
|
////////////////////////////////////////////////////
|
int bus_client_publish(void* handle, const char* topic, const size_t topicl, const char* data, const size_t size){
|
MsgPublish pbmsg;
|
pbmsg.set_topic(topic, topicl);
|
pbmsg.set_data(data, size);
|
auto pbstr = pbmsg.SerializeAsString();
|
return bus_client_pubmsg(handle, pbstr.data(), pbstr.size());
|
}
|
|
// test
|
int bus_client_pubmsg(void* handle, const char* data, const size_t size){
|
client* cli = ptr(handle);
|
return bus_publish(cli->bus, data, size, 100);
|
}
|