/* * ===================================================================================== * * Filename: center.cpp * * Description: * * Version: 1.0 * Created: 2021年03月30日 16时19分37秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "center.h" #include "center_topic_node.h" #include "io_service.h" #include "node_center.h" #include "tcp_proxy.h" #include "tcp_server.h" #include using namespace std::chrono; using namespace std::chrono_literals; using namespace bhome_shm; using namespace bhome_msg; typedef BHCenter::MsgHandler Handler; namespace { template inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer) { if (head.route_size() != 1) { return; } Body body; if (msg.ParseBody(body)) { replyer(onmsg(body)); } } #define CASE_ON_MSG_TYPE(MsgTag) \ case kMsgType##MsgTag: \ Dispatch( \ msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ return true; auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, Synced ¢er) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; MsgI msg(socket.shm()); if (msg.Make(reply_head, rep_body)) { DEFER1(msg.Release();); center->SendAllocMsg(socket, remote, msg); } }; } bool AddCenter(std::shared_ptr> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy) { // command auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { auto ¢er = *center_ptr; return IsCmd(cmd) && center->OnCommand(socket, cmd); }; // now we can talk. auto OnCenterIdle = [center_ptr](ShmSocket &socket) { auto ¢er = *center_ptr; auto onInit = [&](const int64_t request) { return center->OnNodeInit(socket, request); }; BHCenterHandleInit(socket.shm(), onInit); center->OnTimer(); }; auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); if (!head.dest().ip().empty()) { // other host, proxy auto valid = [&]() { return head.route_size() == 1; }; if (!valid()) { return false; } if (head.type() == kMsgTypeRequestTopic) { typedef MsgRequestTopicReply Reply; Reply reply; if (!center->CheckMsg(head, reply)) { replyer(reply); } else { auto onResult = [¢er](BHMsgHead &head, std::string body_content) { if (head.route_size() > 0) { auto &back = head.route(head.route_size() - 1); MQInfo dest = {back.mq_id(), back.abs_addr()}; head.mutable_route()->RemoveLast(); center->PassRemoteReplyToLocal(dest, head, std::move(body_content)); } }; uint16_t port = head.dest().port(); if (port == 0) { port = kBHCenterPort; } if (!tcp_proxy.Request(head.dest().ip(), port, msg.content(), onResult)) { replyer(MakeReply(eError, "send request failed.")); } else { // success } } return true; } else { // ignore other msgs for now. } return false; } switch (head.type()) { CASE_ON_MSG_TYPE(ProcInit); CASE_ON_MSG_TYPE(Register); CASE_ON_MSG_TYPE(Heartbeat); CASE_ON_MSG_TYPE(Unregister); CASE_ON_MSG_TYPE(RegisterRPC); CASE_ON_MSG_TYPE(QueryTopic); CASE_ON_MSG_TYPE(QueryProc); default: return false; } }; BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); auto OnPublish = [&]() { MsgPublish pub; if (head.route_size() == 1 && msg.ParseBody(pub)) { // replyer(center->Publish(head, pub.topic(), msg)); // dead lock? auto reply(center->Publish(head, pub.topic(), msg)); replyer(reply); auto hosts = center->FindRemoteSubClients(pub.topic()); for (auto &host : hosts) { tcp_proxy.Publish(host, kBHCenterPort, msg.content()); } } }; switch (head.type()) { CASE_ON_MSG_TYPE(Subscribe); CASE_ON_MSG_TYPE(Unsubscribe); case kMsgTypePublish: OnPublish(); return true; default: return false; } }; BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); return true; } #undef CASE_ON_MSG_TYPE } // namespace BHCenter::CenterRecords &BHCenter::Centers() { static CenterRecords rec; return rec; } bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len) { Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mq, mq_len}; return true; } BHCenter::BHCenter(Socket::Shm &shm) { auto nsec = NodeTimeoutSec(); auto center_ptr = std::make_shared>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. io_service_.reset(new IoService); tcp_proxy_.reset(new TcpProxy(io_service_->io())); AddCenter(center_ptr, shm, *tcp_proxy_); for (auto &kv : Centers()) { auto &info = kv.second; sockets_[info.name_] = std::make_shared(info.mq_.offset_, shm, info.mq_.id_); } topic_node_.reset(new CenterTopicNode(center_ptr, shm)); tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr)); } BHCenter::~BHCenter() { Stop(); } bool BHCenter::Start() { for (auto &kv : Centers()) { auto &info = kv.second; sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); } topic_node_->Start(); return true; } bool BHCenter::Stop() { tcp_proxy_.reset(); tcp_server_.reset(); io_service_.reset(); topic_node_->Stop(); for (auto &kv : sockets_) { kv.second->Stop(); } return true; }