#include #include #include #include #include #include #include #include "cbhomeclient.h" #include "fixed_q.h" #include "3rdparty/bus_nng/interface_bus_api.h" #include "3rdparty/bus_nng/bn_api.h" #include "bhome_msg.pb.h" using namespace bhome_msg; using namespace std; typedef std::vector> Msg; typedef tuple MsgCR; static MsgCR dummy(){return MsgCR(false, MsgCommonReply{});} // ms const int sndto = 100; const int rcvto = 100; static void freeMsg(Msg& msg){ for (auto& m : msg){ auto tmp = get<0>(m); if (tmp) free(tmp); } } struct client{ unique_ptr thrd_sub{nullptr}; unique_ptr> sub_q{nullptr}; unique_ptr thrd_readreq{nullptr}; unique_ptr> readreq_q{nullptr}; atomic_bool thrd_quit{false}; void* bus{nullptr}; cproc* pinfo{nullptr}; ~client(){ internal_cproc_free(pinfo); thrd_quit.store(true, memory_order_acq_rel); if (thrd_sub) thrd_sub->join(); if (thrd_readreq) thrd_readreq->join(); if (sub_q) sub_q->clear(freeMsg); if (readreq_q) readreq_q->clear(freeMsg); bus_cleanup(bus); } }; /////////////////////////////////////////////////////////// template 0)>::type* = nullptr> auto crop(T&& t) -> decltype(make_tuple(get(std::forward(t))...)){ return make_tuple(get(std::forward(t))...); } template void msg_helper(index_sequence<>, T&&, Msg&){} template void msg_helper(index_sequence, T&& t, Msg& m){ m.emplace_back(make_tuple(*get(std::forward(t)),0)); } template void msg_helper(index_sequence, T&& t, Msg& m){ m.emplace_back(make_tuple(*get(std::forward(t)), *get(std::forward(t)))); msg_helper(index_sequence{}, std::forward(t), m); } template Msg msg(T&& t){ Msg m; msg_helper(make_index_sequence::type>::value>{}, std::forward(t), m); return m; } template 0)>::type* = nullptr> Msg to_bus(client* cli, F&& f, Args&&... args){ Msg mesg; if (std::forward(f)(cli->bus, std::forward(args)...)) mesg = msg(crop(tuple(std::forward(args)...))); return mesg; } template ::type* = nullptr> MsgCR parse(client* cli, const tuple& tp){ MsgCR msg(dummy()); MsgCommonReply m; while(!cli->thrd_quit.load(memory_order_acquire)){ typename tuple_element<0, tuple>::type d; typename tuple_element<1, tuple>::type s; tie(d, s) = tp; if (m.ParseFromArray(d, s)) { bus_free(d, s); msg = make_tuple(true, std::move(m)); break; } m.Clear(); this_thread::sleep_for(chrono::milliseconds{sndto}); } return msg; } ///////////////////////////////////////////////////////// template MsgCR to_center(client* cli, F&& f, Args&&... args){ MsgCR msg(dummy()); auto vmsg = to_bus(cli, std::forward(f), std::forward(args)...); if (!vmsg.empty()) msg = parse(cli, vmsg.at(0)); return msg; } template MsgCR to_topic(client* cli, F&& f, char** topic, const size_t count){ MsgCR msg(dummy()); MsgTopicList tlist; for(size_t i = 0; i < count; i++) tlist.add_topic_list(topic[i]); const auto& tpc = tlist.SerializeAsString(); void* replymsg = NULL; int replysize = 0; msg = to_center<2,3>(cli, std::forward(f), tpc.data(), tpc.size(), &replymsg, &replysize, sndto); return msg; } static void thread_readreq(client* cli){ while (!cli->thrd_quit.load(memory_order_acquire)) { void* procid = NULL; int procidsize = 0; void* req = NULL; int reqsize = 0; void* src = NULL; Msg msg = to_bus<0,1,2,3,4>(cli, bus_recv_request, &procid, &procidsize, &req, &reqsize, &src, rcvto); if (!msg.empty()){ // 收到信息,保存 // pollcontrol代码应该不需要readreq,先不写逻辑,直接释放 // freeMsg(msg); // continue; cli->readreq_q->emplace(std::move(msg)); } } } static void thread_sub(client* cli){ while (!cli->thrd_quit.load(memory_order_acquire)) { void* procid = NULL; int procidsize = 0; void* req = NULL; int reqsize = 0; Msg msg = to_bus<0,1,2,3>(cli, bus_recv_pubmsg, &procid, &procidsize, &req, &reqsize, rcvto); if (!msg.empty()) { cli->sub_q->emplace(std::move(msg)); // printf("======>> thread_sub a msg\n"); } } } static void registered(client* cli, const creg* rinfo){ ProcInfo pinfo; auto proc = creg_proc(rinfo); pinfo.set_name(cproc_name(proc)); pinfo.set_proc_id(cproc_id(proc)); const auto& pbproc = pinfo.SerializeAsString(); while (!cli->thrd_quit.load(memory_order_acquire)) { void* replymsg = NULL; int replysize = 0; cli->bus = bus_register(pbproc.data(), pbproc.size(), &replymsg, &replysize, sndto); bus_free(replymsg, replysize); if (cli->bus) break; } // register success start read request thread cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); })); // request/reply和pub topic一起处理 // 只需要将字符串指针拷贝就行,不需创建字符串内存 auto shallowMerge = [](char** a1, const size_t c1, char** a2, const size_t c2){ auto tmp = (char**)malloc((c1 + c2) * sizeof(char*)); auto dst = tmp; auto cp2dst = [&dst](char** src, const size_t cnt){ memcpy(dst, src, cnt * sizeof(char*)); dst += cnt; }; cp2dst(a1, c1); cp2dst(a2, c2); return tmp; }; size_t tpcc = 0, pubc = 0; char** tpc = creg_reply_topic(rinfo, &tpcc); char** pub = creg_pub_topic(rinfo, &pubc); auto tmparr = shallowMerge(tpc, tpcc, pub, pubc); auto tpcmsg = to_topic(cli, bus_register_topics, tmparr, tpcc + pubc); for(size_t i = 0; i < tpcc+pubc; i++){ printf("======>> proc [%s] topic %lu -> %s\n", cproc_name(proc), i, tmparr[i]); } free(tmparr); // if topic pub/sub[net] exist, register topics tpc = creg_sub_topic(rinfo, &tpcc); auto submsg = to_topic(cli, bus_subscribe_topics, tpc, tpcc); tpc = creg_subnet_topic(rinfo, &tpcc); auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, tpc, tpcc); if (get<0>(submsg) && !cli->thrd_sub) cli->thrd_sub.reset(new thread([cli]{ thread_sub(cli); })); } static void unregistered(client* cli){ ProcInfo pinfo; auto proc = cli->pinfo; pinfo.set_name(cproc_name(proc)); pinfo.set_proc_id(cproc_id(proc)); const auto& pbproc = pinfo.SerializeAsString(); void* rep; int repl; to_center<2,3>(cli, bus_unregister, pbproc.data(), pbproc.size(), &rep, &repl, sndto); } static inline client* ptr(void* handle){ return static_cast(handle); } void* bus_client_init(const char* srvid, const size_t sidsize, const creg* rinfo){ client* cli = new client; cli->pinfo = internal_clone_cproc(creg_proc(rinfo)); auto pred = [cli]{ return cli->thrd_quit.load(memory_order_relaxed); }; const size_t qsize = 5; cli->sub_q.reset(new fixed_q(qsize, pred)); cli->readreq_q.reset(new fixed_q(qsize, pred)); registered(cli, rinfo); return cli; } void bus_client_free(void* handle){ client* cli = ptr(handle); unregistered(cli); delete cli; } static struct csubmsg* parse_submsg(const Msg& msg){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; tie(procid, pids) = msg.at(0); bus_free(procid, pids); tie(data, size) = msg.at(1); auto pmsg = to_submsg((const char *)data, size); bus_free(data, size); return pmsg; } struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms){ client* cli = ptr(handle); Msg msg = cli->sub_q->pop(ms); if (msg.empty()) return NULL; return parse_submsg(msg); } struct csubmsg* bus_client_get_submsg(void* handle){ client* cli = ptr(handle); Msg msg = cli->sub_q->pop(); if (msg.empty()) return NULL; return parse_submsg(msg); } static struct creqmsg* parse_reqmsg(const Msg& msg, void** src){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; tie(procid, pids) = msg.at(0); tie(data, size) = msg.at(1); tie(*src, ignore) = msg.at(2); auto pmsg = to_reqmsg((const char*)procid, pids, (const char *)data, size); bus_free(procid, pids); bus_free(data, size); return pmsg; } struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms){ client* cli = ptr(handle); Msg msg = cli->readreq_q->pop(ms); if (msg.empty()) return NULL; return parse_reqmsg(msg, src); } struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){ client* cli = ptr(handle); Msg msg = cli->readreq_q->pop(); if (msg.empty()) return NULL; return parse_reqmsg(msg, src); } int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){ void* procid = NULL, *reply = NULL; int pids = 0, replys = 0; auto vmsg = to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0, msg->msg, msg->msgl, &procid, &pids, &reply, &replys, sndto); if (!vmsg.empty()){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; tie(procid, pids) = vmsg.at(0); bus_free(procid, pids); tie(data, size) = vmsg.at(1); MsgRequestTopicReply msgRT; auto pb = msgRT.ParseFromArray(reply, replys); bus_free(reply, replys); if (!pb) return false; *repmsg = make_reply_msg(msgRT.errmsg().errcode(), msgRT.errmsg().errstring().data(), msgRT.errmsg().errstring().size(), msgRT.data().data(), msgRT.data().size()); return true; } return false; } int bus_client_reply(void* handle, void* src, const struct crepmsg* msg){ MsgRequestTopicReply msgR; auto err = msgR.mutable_errmsg(); err->set_errcode((ErrorCode)msg->errcode); err->set_errstring(msg->errmsg, msg->errmsgl); msgR.set_data(msg->data, msg->datal); auto pbstr = msgR.SerializeAsString(); auto cli = ptr(handle); return bus_send_reply(cli->bus, src, pbstr.data(), pbstr.size()); } struct cqueryprocs* bus_client_query_procs(void* handle, size_t* count){ BHAddress addr; const auto& pbaddr = addr.SerializeAsString(); MsgQueryProc topic; const auto& pbtopic = topic.SerializeAsString(); void* rep = NULL; int repl = 0; auto msg = to_bus<4,5>(ptr(handle), bus_query_procs, pbaddr.data(), pbaddr.size(), pbtopic.data(), pbtopic.size(), &rep, &repl, sndto); if (msg.empty()) return NULL; // bug 已修复, 在 bus_nng 中 // bus_query_procs 获取数据失败, 暂时改用BHQueryProcs // if (!BHQueryProcs(pbaddr.data(), pbaddr.size(), pbtopic.data(), pbtopic.size(), &rep, &repl, sndto)) { // return NULL; // } MsgQueryProcReply msgR; msgR.ParseFromArray(rep, repl); bus_free(rep, repl); *count = msgR.proc_list_size(); auto procs = (struct cqueryprocs*)calloc(*count, sizeof(struct cqueryprocs)); for(size_t i = 0; i < *count; i++){ const auto& p = msgR.proc_list(i); size_t idl = p.proc().proc_id().size(); char* id = (char*)calloc(idl+1, 1); memcpy(id, p.proc().proc_id().data(), idl); procs[i] = cqueryprocs{ .id = id, .idl = idl, .online = p.online() }; } return procs; } //////////////////////////////////////////////////// int bus_client_publish(void* handle, const char* topic, const size_t topicl, const char* data, const size_t size){ MsgPublish pbmsg; pbmsg.set_topic(topic, topicl); pbmsg.set_data(data, size); auto pbstr = pbmsg.SerializeAsString(); return bus_client_publish_msg(handle, pbstr.data(), pbstr.size()); } // test int bus_client_publish_msg(void* handle, const char* data, const size_t size){ client* cli = ptr(handle); return bus_publish(cli->bus, data, size, 100); }