/* * ===================================================================================== * * Filename: center.cpp * * Description: * * Version: 1.0 * Created: 2021年03月30日 16时19分37秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "center.h" #include "bh_util.h" #include "defs.h" #include "shm.h" #include #include using namespace std::chrono; using namespace std::chrono_literals; using namespace bhome_shm; using namespace bhome_msg; typedef BHCenter::MsgHandler Handler; namespace { //TODO check proc_id class NodeCenter { public: typedef std::string ProcId; typedef std::string Address; typedef bhome_msg::ProcInfo ProcInfo; typedef std::function Cleaner; private: enum { kStateInvalid, kStateNormal, kStateOffline, kStateKillme, }; struct ProcState { int64_t timestamp_ = 0; uint32_t flag_ = 0; // reserved void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) { auto diff = now - timestamp_; #ifndef NDEBUG printf("state %p diff: %ld\n", this, diff); #endif if (diff < offline_time) { flag_ = kStateNormal; } else if (diff < kill_time) { flag_ = kStateOffline; } else { flag_ = kStateKillme; } } }; typedef std::unordered_map> AddressTopics; struct NodeInfo { ProcState state_; // state std::set
addrs_; // registered mqs ProcInfo proc_; // AddressTopics services_; // address: topics AddressTopics subscriptions_; // address: topics }; typedef std::shared_ptr Node; typedef std::weak_ptr WeakNode; struct TopicDest { Address mq_; WeakNode weak_node_; bool operator<(const TopicDest &a) const { return mq_ < a.mq_; } }; inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } inline bool MatchAddr(std::set
const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) : id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {} public: typedef std::set Clients; NodeCenter(const std::string &id, const Cleaner &cleaner, const steady_clock::duration offline_time, const steady_clock::duration kill_time) : NodeCenter(id, cleaner, duration_cast(offline_time).count(), duration_cast(kill_time).count()) {} // center name, no relative to shm. const std::string &id() const { return id_; } MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) { if (msg.proc().proc_id() != head.proc_id()) { return MakeReply(eInvalidInput, "invalid proc id."); } try { auto UpdateRegInfo = [&](Node &node) { node->addrs_.insert(SrcAddr(head)); for (auto &addr : msg.addrs()) { node->addrs_.insert(addr.mq_id()); } node->proc_.Swap(msg.mutable_proc()); node->state_.timestamp_ = head.timestamp(); node->state_.UpdateState(NowSec(), offline_time_, kill_time_); }; auto pos = nodes_.find(head.proc_id()); if (pos != nodes_.end()) { // new client Node &node = pos->second; if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) { // node restarted, release old mq. RemoveNode(node); node.reset(new NodeInfo); } UpdateRegInfo(node); } else { Node node(new NodeInfo); UpdateRegInfo(node); nodes_[node->proc_.proc_id()] = node; } return MakeReply(eSuccess); } catch (...) { return MakeReply(eError, "register node error."); } } template Reply HandleMsg(const BHMsgHead &head, Func const &op) { try { auto pos = nodes_.find(head.proc_id()); if (pos == nodes_.end()) { return MakeReply(eNotRegistered, "Node is not registered."); } else { auto &node = pos->second; if (!MatchAddr(node->addrs_, SrcAddr(head))) { return MakeReply(eAddressNotMatch, "Node address error."); } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) { return op(node); } else if (!Valid(*node)) { return MakeReply(eNoRespond, "Node is not alive."); } else { return op(node); } } } catch (...) { //TODO error log return MakeReply(eError, "internal error."); } } template inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op) { return HandleMsg(head, op); } MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg) { return HandleMsg( head, [&](Node node) -> MsgCommonReply { auto &src = SrcAddr(head); auto &topics = msg.topics().topic_list(); node->services_[src].insert(topics.begin(), topics.end()); TopicDest dest = {src, node}; for (auto &topic : topics) { service_map_[topic].insert(dest); } return MakeReply(eSuccess); }); } MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg) { return HandleMsg(head, [&](Node node) { NodeInfo &ni = *node; ni.state_.timestamp_ = head.timestamp(); ni.state_.UpdateState(NowSec(), offline_time_, kill_time_); auto &info = msg.proc(); if (!info.public_info().empty()) { ni.proc_.set_public_info(info.public_info()); } if (!info.private_info().empty()) { ni.proc_.set_private_info(info.private_info()); } return MakeReply(eSuccess); }); } MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req) { typedef MsgQueryTopicReply Reply; auto query = [&](Node self) -> MsgQueryTopicReply { auto pos = service_map_.find(req.topic()); if (pos != service_map_.end() && !pos->second.empty()) { // now just find first one. const TopicDest &dest = *(pos->second.begin()); Node dest_node(dest.weak_node_.lock()); if (!dest_node) { service_map_.erase(pos); return MakeReply(eOffline, "topic server offline."); } else if (!Valid(*dest_node)) { return MakeReply(eNoRespond, "topic server not responding."); } else { MsgQueryTopicReply reply = MakeReply(eSuccess); reply.mutable_address()->set_mq_id(dest.mq_); return reply; } } else { return MakeReply(eNotFound, "topic server not found."); } }; return HandleMsg(head, query); } MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) { return HandleMsg(head, [&](Node node) { auto &src = SrcAddr(head); auto &topics = msg.topics().topic_list(); node->subscriptions_[src].insert(topics.begin(), topics.end()); TopicDest dest = {src, node}; for (auto &topic : topics) { subscribe_map_[topic].insert(dest); } return MakeReply(eSuccess); }); } MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) { return HandleMsg(head, [&](Node node) { auto &src = SrcAddr(head); auto pos = node->subscriptions_.find(src); auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { auto pos = subscribe_map_.find(topic); if (pos != subscribe_map_.end() && pos->second.erase(dest) != 0 && pos->second.empty()) { subscribe_map_.erase(pos); } }; if (pos != node->subscriptions_.end()) { const TopicDest &dest = {src, node}; auto &topics = msg.topics().topic_list(); // clear node sub records; for (auto &topic : topics) { pos->second.erase(topic); RemoveSubTopicDestRecord(topic, dest); } if (pos->second.empty()) { node->subscriptions_.erase(pos); } } return MakeReply(eSuccess); }); } Clients DoFindClients(const std::string &topic) { Clients dests; auto Find1 = [&](const std::string &t) { auto pos = subscribe_map_.find(topic); if (pos != subscribe_map_.end()) { auto &clients = pos->second; for (auto &cli : clients) { if (Valid(cli.weak_node_)) { dests.insert(cli); } } } }; Find1(topic); size_t pos = 0; while (true) { pos = topic.find(kTopicSep, pos); if (pos == topic.npos || ++pos == topic.size()) { // Find1(std::string()); // sub all. break; } else { Find1(topic.substr(0, pos)); } } return dests; } bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) { bool ret = false; HandleMsg(head, [&](Node node) { DoFindClients(msg.topic()).swap(out); ret = true; return MakeReply(eSuccess); }).Swap(&reply); return ret; } void OnTimer() { CheckNodes(); } private: void CheckNodes() { auto now = NowSec(); if (now - last_check_time_ < 1) { return; } last_check_time_ = now; auto it = nodes_.begin(); while (it != nodes_.end()) { auto &cli = *it->second; cli.state_.UpdateState(now, offline_time_, kill_time_); if (cli.state_.flag_ == kStateKillme) { RemoveNode(it->second); it = nodes_.erase(it); } else { ++it; } } } bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } bool Valid(const WeakNode &weak) { auto node = weak.lock(); return node && Valid(*node); } void RemoveNode(Node &node) { auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) { for (auto &addr_topics : node_rec) { TopicDest dest{addr_topics.first, node}; for (auto &topic : addr_topics.second) { auto pos = rec_map.find(topic); if (pos != rec_map.end()) { pos->second.erase(dest); if (pos->second.empty()) { rec_map.erase(pos); } } } } }; EraseMapRec(service_map_, node->services_); EraseMapRec(subscribe_map_, node->subscriptions_); for (auto &addr : node->addrs_) { cleaner_(addr); } node->addrs_.clear(); } std::string id_; // center proc id; std::unordered_map service_map_; std::unordered_map subscribe_map_; std::unordered_map nodes_; Cleaner cleaner_; // remove mqs. int64_t offline_time_; int64_t kill_time_; int64_t last_check_time_; }; template inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer) { if (head.route_size() != 1) { return; } Body body; if (msg.ParseBody(body)) { replyer(onmsg(body)); } } Handler Combine(const Handler &h1, const Handler &h2) { return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) { return h1(socket, msg, head) || h2(socket, msg, head); }; } template Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest) { return Combine(Combine(h0, h1), h2, rest...); } #define CASE_ON_MSG_TYPE(MsgTag) \ case kMsgType##MsgTag: \ Dispatch( \ msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ return true; bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) { auto center_ptr = std::make_shared>(id, cleaner, 60s, 60s * 2); auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); auto &remote = head.route(0).mq_id(); socket.Send(remote.data(), reply_head, rep_body); }; }; auto OnCenterIdle = [center_ptr](ShmSocket &socket) { auto ¢er = *center_ptr; center->OnTimer(); }; auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center->id()); switch (head.type()) { CASE_ON_MSG_TYPE(Register); CASE_ON_MSG_TYPE(Heartbeat); CASE_ON_MSG_TYPE(RegisterRPC); CASE_ON_MSG_TYPE(QueryTopic); default: return false; } }; auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center->id()); auto OnPublish = [&]() { MsgPublish pub; NodeCenter::Clients clients; MsgCommonReply reply; if (head.route_size() != 1 || !msg.ParseBody(pub)) { return; } else if (!center->FindClients(head, pub, clients, reply)) { replyer(reply); } else { replyer(MakeReply(eSuccess)); if (!msg.EnableRefCount(socket.shm())) { return; } // no memory? if (clients.empty()) { return; } auto it = clients.begin(); do { auto &cli = *it; auto node = cli.weak_node_.lock(); if (node) { // should also make sure that mq is not killed before msg expires. // it would be ok if (kill_time - offline_time) is longer than expire time. socket.Send(cli.mq_.data(), msg); ++it; } else { it = clients.erase(it); } } while (it != clients.end()); } }; switch (head.type()) { CASE_ON_MSG_TYPE(Subscribe); CASE_ON_MSG_TYPE(Unsubscribe); case kMsgTypePublish: OnPublish(); return true; default: return false; } }; BHCenter::Install("#center.reg", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000); BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000); return true; } #undef CASE_ON_MSG_TYPE } // namespace BHCenter::CenterRecords &BHCenter::Centers() { static CenterRecords rec; return rec; } bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len) { Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len}; return true; } bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len) { return Install(name, handler, idle, std::string((const char *) &mqid, sizeof(mqid)), mq_len); } BHCenter::BHCenter(Socket::Shm &shm) { auto gc = [&](const std::string &id) { auto r = ShmSocket::Remove(shm, *(MQId *) id.data()); printf("remove mq : %s\n", r ? "ok" : "failed"); }; AddCenter("#bhome_center", gc); for (auto &kv : Centers()) { auto &info = kv.second; sockets_[info.name_] = std::make_shared(shm, *(MQId *) info.mqid_.data(), info.mq_len_); } } bool BHCenter::Start() { for (auto &kv : Centers()) { auto &info = kv.second; sockets_[info.name_]->Start(info.handler_, info.idle_); } return true; } bool BHCenter::Stop() { for (auto &kv : sockets_) { kv.second->Stop(); } return true; }