/* * ===================================================================================== * * Filename: node_center.h * * Description: * * Version: 1.0 * Created: 2021年05月20日 11时33分06秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #ifndef NODE_CENTER_KY67RJ1Q #define NODE_CENTER_KY67RJ1Q #include "shm_socket.h" #include typedef std::string ProcId; typedef size_t ProcIndex; // max local procs. const int kMaxProcs = 65536; // record all procs ever registered, always grow, never remove. // mainly for node to request msg allocation. // use index instead of MQId to save some bits. class ProcRecords { public: struct ProcRec { ProcId proc_; MQId ssn_ = 0; }; ProcRecords() { procs_.reserve(kMaxProcs); } ProcIndex Put(const ProcId &proc_id, const MQId ssn); const ProcRec &Get(const ProcIndex index) const; private: std::unordered_map proc_index_; std::vector procs_; }; class MsgRecords { typedef int64_t MsgId; typedef int64_t Offset; public: void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); } void FreeMsg(MsgId id); void AutoRemove(); size_t size() const { return msgs_.size(); } void DebugPrint() const; private: std::unordered_map msgs_; int64_t time_to_clean_ = 0; }; class NodeCenter { public: typedef MQId Address; typedef bhome_msg::ProcInfo ProcInfo; typedef std::function Cleaner; private: enum { kStateInvalid, kStateNormal, kStateOffline, kStateKillme, }; struct ProcState { int64_t timestamp_ = 0; uint32_t flag_ = 0; // reserved void PutOffline(const int64_t offline_time); void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); }; typedef std::unordered_map> AddressTopics; struct NodeInfo { ProcState state_; // state std::map addrs_; // registered mqs ProcInfo proc_; // AddressTopics services_; // address: topics AddressTopics subscriptions_; // address: topics }; typedef std::shared_ptr Node; typedef std::weak_ptr WeakNode; struct TopicDest { MQId mq_id_; int64_t mq_abs_addr_; WeakNode weak_node_; bool operator<(const TopicDest &a) const { return mq_id_ < a.mq_id_; } }; static inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } static inline int64_t SrcAbsAddr(const BHMsgHead &head) { return head.route(0).abs_addr(); } static inline bool MatchAddr(std::map const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } public: typedef std::set Clients; NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) : id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {} // center name, no relative to shm. const std::string &id() const { return id_; } int64_t OnNodeInit(ShmSocket &socket, const int64_t val); void RecordMsg(const MsgI &msg); bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); void OnAlloc(ShmSocket &socket, const int64_t val); void OnFree(ShmSocket &socket, const int64_t val); bool OnCommand(ShmSocket &socket, const int64_t val); MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg); MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg); template Reply HandleMsg(const BHMsgHead &head, Func const &op) { try { auto pos = nodes_.find(head.ssn_id()); if (pos == nodes_.end()) { return MakeReply(eNotRegistered, "Node is not registered."); } else { auto &node = pos->second; if (!MatchAddr(node->addrs_, SrcAddr(head))) { return MakeReply(eAddressNotMatch, "Node address error."); } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) { return op(node); } else if (!Valid(*node)) { return MakeReply(eNoRespond, "Node is not alive."); } else { return op(node); } } } catch (...) { //TODO error log return MakeReply(eError, "internal error."); } } template inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op) { return HandleMsg(head, op); } MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg); MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg); MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg); MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req); MsgQueryProcReply QueryProc(const std::string &proc_id); MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req); MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg); MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg); Clients DoFindClients(const std::string &topic); bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply); void OnTimer(); private: void CheckNodes(); bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } bool Valid(const WeakNode &weak) { auto node = weak.lock(); return node && Valid(*node); } void RemoveNode(Node &node); std::string id_; // center proc id; std::unordered_map service_map_; std::unordered_map subscribe_map_; std::unordered_map nodes_; std::unordered_map online_node_addr_map_; ProcRecords procs_; // To get a short index for msg alloc. MsgRecords msgs_; // record all msgs alloced. Cleaner cleaner_; // remove mqs. int64_t offline_time_; int64_t kill_time_; int64_t last_check_time_; }; #endif // end of include guard: NODE_CENTER_KY67RJ1Q