/* * ===================================================================================== * * Filename: center.cpp * * Description: * * Version: 1.0 * Created: 2021年03月30日 16时19分37秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "center.h" #include "bh_util.h" #include "defs.h" #include "shm.h" #include #include using namespace std::chrono; using namespace std::chrono_literals; using namespace bhome_shm; using namespace bhome_msg; using namespace bhome::msg; typedef BHCenter::MsgHandler Handler; namespace { typedef steady_clock::time_point TimePoint; inline TimePoint Now() { return steady_clock::now(); }; //TODO check proc_id class NodeCenter { public: typedef std::string ProcId; typedef std::string Address; typedef bhome::msg::ProcInfo ProcInfo; typedef std::function Cleaner; private: enum { kStateInvalid, kStateNormal, kStateOffline, kStateKillme, }; struct ProcState { TimePoint timestamp_; uint32_t flag_ = 0; // reserved void UpdateState(TimePoint now) { const auto kOfflineTime = 60 * 10s; const auto kKillTime = 60 * 20s; auto diff = now - timestamp_; if (diff < kOfflineTime) { flag_ = kStateNormal; } else if (diff < kKillTime) { flag_ = kStateOffline; } else { flag_ = kStateKillme; } } }; typedef std::unordered_map> AddressTopics; struct NodeInfo { ProcState state_; // state std::set
addrs_; // registered mqs ProcInfo proc_; // AddressTopics services_; // address: topics AddressTopics subscriptions_; // address: topics }; typedef std::shared_ptr Node; typedef std::weak_ptr WeakNode; struct TopicDest { Address mq_; WeakNode weak_node_; bool operator<(const TopicDest &a) const { return mq_ < a.mq_; } }; inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } inline bool MatchAddr(std::set
const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } public: typedef std::set Clients; NodeCenter(const std::string &id, const Cleaner &cleaner) : id_(id), cleaner_(cleaner) {} const std::string &id() const { return id_; } // no need to lock. //TODO maybe just return serialized string. MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) { if (msg.proc().proc_id() != head.proc_id()) { return MakeReply(eInvalidInput, "invalid proc id."); } try { Node node(new NodeInfo); node->addrs_.insert(SrcAddr(head)); for (auto &addr : msg.addrs()) { node->addrs_.insert(addr.mq_id()); } node->proc_.Swap(msg.mutable_proc()); node->state_.timestamp_ = Now(); node->state_.flag_ = kStateNormal; nodes_[node->proc_.proc_id()] = node; return MakeReply(eSuccess); } catch (...) { return MakeReply(eError, "register node error."); } } template Reply HandleMsg(const BHMsgHead &head, Func const &op) { try { auto pos = nodes_.find(head.proc_id()); if (pos == nodes_.end()) { return MakeReply(eNotRegistered, "Node is not registered."); } else { auto node = pos->second; if (!MatchAddr(node->addrs_, SrcAddr(head))) { return MakeReply(eAddressNotMatch, "Node address error."); } else if (!Valid(*node)) { return MakeReply(eNoRespond, "Node is not alive."); } else { return op(node); } } } catch (...) { //TODO error log return MakeReply(eError, "internal error."); } } template inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op) { return HandleMsg(head, op); } MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg) { return HandleMsg( head, [&](Node node) -> MsgCommonReply { auto &src = SrcAddr(head); auto &topics = msg.topics().topic_list(); node->services_[src].insert(topics.begin(), topics.end()); TopicDest dest = {src, node}; for (auto &topic : topics) { service_map_[topic].insert(dest); } return MakeReply(eSuccess); }); } MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg) { return HandleMsg(head, [&](Node node) { NodeInfo &ni = *node; ni.state_.timestamp_ = Now(); auto &info = msg.proc(); if (!info.public_info().empty()) { ni.proc_.set_public_info(info.public_info()); } if (!info.private_info().empty()) { ni.proc_.set_private_info(info.private_info()); } return MakeReply(eSuccess); }); } MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req) { typedef MsgQueryTopicReply Reply; auto query = [&](Node self) -> MsgQueryTopicReply { auto pos = service_map_.find(req.topic()); if (pos != service_map_.end() && !pos->second.empty()) { // now just find first one. const TopicDest &dest = *(pos->second.begin()); Node dest_node(dest.weak_node_.lock()); if (!dest_node) { service_map_.erase(pos); return MakeReply(eOffline, "topic server offline."); } else if (!Valid(*dest_node)) { return MakeReply(eNoRespond, "topic server not responding."); } else { MsgQueryTopicReply reply = MakeReply(eSuccess); reply.mutable_address()->set_mq_id(dest.mq_); return reply; } } else { return MakeReply(eNotFound, "topic server not found."); } }; return HandleMsg(head, query); } MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) { return HandleMsg(head, [&](Node node) { auto &src = SrcAddr(head); auto &topics = msg.topics().topic_list(); node->subscriptions_[src].insert(topics.begin(), topics.end()); TopicDest dest = {src, node}; for (auto &topic : topics) { subscribe_map_[topic].insert(dest); } return MakeReply(eSuccess); }); } MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) { return HandleMsg(head, [&](Node node) { auto &src = SrcAddr(head); auto pos = node->subscriptions_.find(src); auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { auto pos = subscribe_map_.find(topic); if (pos != subscribe_map_.end() && pos->second.erase(dest) != 0 && pos->second.empty()) { subscribe_map_.erase(pos); } }; if (pos != node->subscriptions_.end()) { const TopicDest &dest = {src, node}; auto &topics = msg.topics().topic_list(); // clear node sub records; for (auto &topic : topics) { pos->second.erase(topic); RemoveSubTopicDestRecord(topic, dest); } if (pos->second.empty()) { node->subscriptions_.erase(pos); } } return MakeReply(eSuccess); }); } Clients DoFindClients(const std::string &topic) { Clients dests; auto Find1 = [&](const std::string &t) { auto pos = subscribe_map_.find(topic); if (pos != subscribe_map_.end()) { auto &clients = pos->second; for (auto &cli : clients) { if (Valid(cli.weak_node_)) { dests.insert(cli); } } } }; Find1(topic); size_t pos = 0; while (true) { pos = topic.find(kTopicSep, pos); if (pos == topic.npos || ++pos == topic.size()) { // Find1(std::string()); // sub all. break; } else { Find1(topic.substr(0, pos)); } } return dests; } bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) { bool ret = false; HandleMsg(head, [&](Node node) { DoFindClients(msg.topic()).swap(out); ret = true; return MakeReply(eSuccess); }).Swap(&reply); return ret; } void OnTimer() { CheckNodes(); } private: void CheckNodes() { auto it = nodes_.begin(); while (it != nodes_.end()) { auto &cli = *it->second; cli.state_.UpdateState(Now()); if (cli.state_.flag_ == kStateKillme) { if (cleaner_) { for (auto &addr : cli.addrs_) { cleaner_(addr); } } it = nodes_.erase(it); } else { ++it; } } } bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } bool Valid(const WeakNode &weak) { auto node = weak.lock(); return node && Valid(*node); } void CheckAllNodes(); //TODO, call it in timer. std::string id_; // center proc id; std::unordered_map service_map_; std::unordered_map subscribe_map_; std::unordered_map nodes_; Cleaner cleaner_; // remove mqs. }; template inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer) { if (head.route_size() != 1) { return; } Body body; if (msg.ParseBody(body)) { replyer(onmsg(body)); } } Handler Combine(const Handler &h1, const Handler &h2) { return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head) { return h1(socket, msg, head) || h2(socket, msg, head); }; } template Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest) { return Combine(Combine(h0, h1), h2, rest...); } #define CASE_ON_MSG_TYPE(MsgTag) \ case kMsgType##MsgTag: \ Dispatch( \ msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ return true; bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) { auto center_ptr = std::make_shared>(id, cleaner); auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100); if (!r) { printf("send reply failed.\n"); } //TODO resend failed. }; }; auto OnCenterIdle = [center_ptr](ShmSocket &socket) { auto ¢er = *center_ptr; center->OnTimer(); }; auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center->id()); switch (head.type()) { CASE_ON_MSG_TYPE(Register); CASE_ON_MSG_TYPE(Heartbeat); CASE_ON_MSG_TYPE(RegisterRPC); CASE_ON_MSG_TYPE(QueryTopic); default: return false; } }; auto OnBusIdle = [](ShmSocket &socket) {}; auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center->id()); auto OnPublish = [&]() { MsgPublish pub; NodeCenter::Clients clients; MsgCommonReply reply; if (head.route_size() != 1 || !msg.ParseBody(pub)) { return; } else if (!center->FindClients(head, pub, clients, reply)) { MakeReplyer(socket, head, center->id())(reply); } else { MakeReplyer(socket, head, center->id())(MakeReply(eSuccess)); if (!msg.EnableRefCount(socket.shm())) { return; } // no memory? for (auto &cli : clients) { auto node = cli.weak_node_.lock(); if (node) { if (!socket.Send(cli.mq_.data(), msg, 100)) { printf("center route publish failed. need resend.\n"); } } } } }; switch (head.type()) { CASE_ON_MSG_TYPE(Subscribe); CASE_ON_MSG_TYPE(Unsubscribe); case kMsgTypePublish: OnPublish(); return true; default: return false; } }; BHCenter::Install("#center.reg", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000); BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000); return true; } #undef CASE_ON_MSG_TYPE } // namespace SharedMemory &BHomeShm() { static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64); return shm; } BHCenter::CenterRecords &BHCenter::Centers() { static CenterRecords rec; return rec; } bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len) { Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len}; return true; } bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len) { return Install(name, handler, idle, std::string((const char *) &mqid, sizeof(mqid)), mq_len); } BHCenter::BHCenter(Socket::Shm &shm) { auto gc = [&](const std::string &id) { auto r = ShmSocket::Remove(shm, *(MQId *) id.data()); printf("remove mq : %s\n", r ? "ok" : "failed"); }; AddCenter("#center", gc); for (auto &kv : Centers()) { auto &info = kv.second; sockets_[info.name_] = std::make_shared(shm, *(MQId *) info.mqid_.data(), info.mq_len_); } } BHCenter::BHCenter() : BHCenter(BHomeShm()) {} bool BHCenter::Start() { for (auto &kv : Centers()) { auto &info = kv.second; sockets_[info.name_]->Start(info.handler_); } return true; } bool BHCenter::Stop() { for (auto &kv : sockets_) { kv.second->Stop(); } return true; }