/* * ===================================================================================== * * Filename: center.cpp * * Description: * * Version: 1.0 * Created: 2021年03月30日 16时19分37秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "center.h" #include "center_topic_node.h" #include "node_center.h" #include using namespace std::chrono; using namespace std::chrono_literals; using namespace bhome_shm; using namespace bhome_msg; typedef BHCenter::MsgHandler Handler; namespace { //TODO check proc_id template inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer) { if (head.route_size() != 1) { return; } Body body; if (msg.ParseBody(body)) { replyer(onmsg(body)); } } Handler Combine(const Handler &h1, const Handler &h2) { return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) { return h1(socket, msg, head) || h2(socket, msg, head); }; } template Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest) { return Combine(Combine(h0, h1), h2, rest...); } #define CASE_ON_MSG_TYPE(MsgTag) \ case kMsgType##MsgTag: \ Dispatch( \ msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ return true; auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, Synced ¢er) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; MsgI msg; if (msg.Make(reply_head, rep_body)) { DEFER1(msg.Release();); center->SendAllocMsg(socket, remote, msg); } }; } bool AddCenter(std::shared_ptr> center_ptr) { // command auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { auto ¢er = *center_ptr; return IsCmd(cmd) && center->OnCommand(socket, cmd); }; // now we can talk. auto OnCenterIdle = [center_ptr](ShmSocket &socket) { auto ¢er = *center_ptr; auto onInit = [&](const int64_t request) { return center->OnNodeInit(socket, request); }; BHCenterHandleInit(onInit); center->OnTimer(); }; auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); switch (head.type()) { CASE_ON_MSG_TYPE(ProcInit); CASE_ON_MSG_TYPE(Register); CASE_ON_MSG_TYPE(Heartbeat); CASE_ON_MSG_TYPE(Unregister); CASE_ON_MSG_TYPE(RegisterRPC); CASE_ON_MSG_TYPE(QueryTopic); CASE_ON_MSG_TYPE(QueryProc); default: return false; } }; BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); auto OnPublish = [&]() { MsgPublish pub; NodeCenter::Clients clients; MsgCommonReply reply; if (head.route_size() != 1 || !msg.ParseBody(pub)) { return; } else if (!center->FindClients(head, pub, clients, reply)) { replyer(reply); } else { replyer(MakeReply(eSuccess)); if (clients.empty()) { return; } for (auto &cli : clients) { auto node = cli.weak_node_.lock(); if (node) { // should also make sure that mq is not killed before msg expires. // it would be ok if (kill_time - offline_time) is longer than expire time. socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); } } } }; switch (head.type()) { CASE_ON_MSG_TYPE(Subscribe); CASE_ON_MSG_TYPE(Unsubscribe); case kMsgTypePublish: OnPublish(); return true; default: return false; } }; BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); return true; } #undef CASE_ON_MSG_TYPE } // namespace BHCenter::CenterRecords &BHCenter::Centers() { static CenterRecords rec; return rec; } bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len) { Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mq, mq_len}; return true; } BHCenter::BHCenter(Socket::Shm &shm) { auto gc = [&](const MQId id) { auto r = ShmSocket::Remove(shm, id); if (r) { LOG_DEBUG() << "remove mq " << id << " ok\n"; } }; auto nsec = NodeTimeoutSec(); auto center_ptr = std::make_shared>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. AddCenter(center_ptr); for (auto &kv : Centers()) { auto &info = kv.second; sockets_[info.name_] = std::make_shared(info.mq_.offset_, shm, info.mq_.id_); } topic_node_.reset(new CenterTopicNode(center_ptr, shm)); } BHCenter::~BHCenter() { Stop(); } bool BHCenter::Start() { for (auto &kv : Centers()) { auto &info = kv.second; sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); } topic_node_->Start(); return true; } bool BHCenter::Stop() { topic_node_->Stop(); for (auto &kv : sockets_) { kv.second->Stop(); } return true; }