/* * ===================================================================================== * * Filename: node_center.h * * Description: * * Version: 1.0 * Created: 2021年05月20日 11时33分06秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #ifndef NODE_CENTER_KY67RJ1Q #define NODE_CENTER_KY67RJ1Q #include "json.h" #include "shm_socket.h" #include typedef std::string ProcId; typedef size_t ProcIndex; // max local procs. const int kMaxProcs = 65536; // record all procs ever registered, always grow, never remove. // mainly for node to request msg allocation. // use index instead of MQId to save some bits. class ProcRecords { public: struct ProcRec { ProcId proc_; MQId ssn_ = 0; }; ProcRecords() { procs_.reserve(kMaxProcs); } ProcIndex Put(const ProcId &proc_id, const MQId ssn); const ProcRec &Get(const ProcIndex index) const; private: std::unordered_map proc_index_; std::vector procs_; }; class MsgRecords { typedef int64_t MsgId; public: void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); } void FreeMsg(MsgId id); void AutoRemove(); size_t size() const { return msgs_.size(); } void DebugPrint() const; private: std::unordered_map msgs_; int64_t time_to_clean_ = 0; }; class NodeCenter { public: typedef MQId Address; typedef bhome_msg::ProcInfo ProcInfo; typedef std::function Cleaner; private: enum { kStateInvalid, kStateNormal, kStateOffline, kStateKillme, }; struct ProcState { int64_t timestamp_ = 0; uint32_t flag_ = 0; // reserved }; typedef std::unordered_map> AddressTopics; struct NodeInfo; typedef std::shared_ptr Node; typedef std::weak_ptr WeakNode; struct NodeInfo { NodeCenter ¢er_; SharedMemory &shm_; ProcState state_; // state std::map addrs_; // registered mqs ProcInfo proc_; // AddressTopics services_; // address: topics AddressTopics local_sub_; // address: topics AddressTopics net_sub_; // address: topics NodeInfo(NodeCenter ¢er, SharedMemory &shm) : center_(center), shm_(shm) {} void PutOffline(const int64_t offline_time); void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node); void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node); }; struct TopicDest { MQId mq_id_; int64_t mq_abs_addr_; WeakNode weak_node_; bool operator<(const TopicDest &a) const { return mq_id_ < a.mq_id_; } }; static inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } static inline int64_t SrcAbsAddr(const BHMsgHead &head) { return head.route(0).abs_addr(); } static inline bool MatchAddr(std::map const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } public: typedef std::set Clients; NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) : id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {} // center name, no relative to shm. const std::string &id() const { return id_; } int64_t OnNodeInit(ShmSocket &socket, const int64_t val); void RecordMsg(const MsgI &msg); bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); bool RemotePublish(BHMsgHead &head, const std::string &body_content); bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content); void OnAlloc(ShmSocket &socket, const int64_t val); void OnFree(ShmSocket &socket, const int64_t val); bool OnCommand(ShmSocket &socket, const int64_t val); MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg); MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg); template Reply HandleMsg(const BHMsgHead &head, Func const &op) { try { auto pos = nodes_.find(head.ssn_id()); if (pos == nodes_.end()) { return MakeReply(eNotRegistered, "Node is not registered."); } else { auto &node = pos->second; if (!MatchAddr(node->addrs_, SrcAddr(head))) { return MakeReply(eAddressNotMatch, "Node address error."); } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) { return op(node); } else if (!Valid(*node)) { return MakeReply(eNoRespond, "Node is not alive."); } else { return op(node); } } } catch (std::exception &e) { LOG_ERROR() << "handle msg exception: " << e.what(); return MakeReply(eError, "internal error."); } } template inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op) { return HandleMsg(head, op); } template bool CheckMsg(const BHMsgHead &head, Reply &reply) { bool r = false; auto onOk = [&](Node) { r = true; return MakeReply(eSuccess); }; reply = HandleMsg(head, onOk); return r; } MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg); MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg); MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg); MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req); MsgQueryProcReply QueryProc(const std::string &proc_id); MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req); MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg); MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg); MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg); void OnTimer(); // remote hosts records std::set FindRemoteSubClients(const Topic &topic) { return net_records_.FindSubHosts(topic); } std::set FindRemoteRPCServers(const Topic &topic) { return net_records_.FindRPCHosts(topic); } void ParseNetInfo(ssjson::Json &info) { net_records_.ParseData(info); } private: void CheckNodes(); bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } void Publish(SharedMemory &shm, const Topic &topic, const std::string &content); void Notify(const Topic &topic, NodeInfo &node); void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg); Clients DoFindClients(const std::string &topic, bool from_remote); bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } bool Valid(const WeakNode &weak) { auto node = weak.lock(); return node && Valid(*node); } void RemoveNode(Node &node); Node GetNode(const MQId mq); std::string id_; // center proc id; std::unordered_map service_map_; std::unordered_map local_sub_map_; std::unordered_map net_sub_map_; std::unordered_map nodes_; std::unordered_map online_node_addr_map_; ProcRecords procs_; // To get a short index for msg alloc. MsgRecords msgs_; // record all msgs alloced. int64_t offline_time_; int64_t kill_time_; int64_t last_check_time_; // net hosts info class NetRecords { public: typedef std::set Hosts; void ParseData(const ssjson::Json &input); Hosts FindRPCHosts(const Topic &topic) { return FindHosts(topic, rpc_hosts_); } Hosts FindSubHosts(const Topic &topic) { return FindHosts(topic, sub_hosts_); } private: typedef std::unordered_map TopicMap; TopicMap sub_hosts_; TopicMap rpc_hosts_; Hosts FindHosts(const Topic &topic, const TopicMap &tmap) { auto pos = tmap.find(topic); if (pos != tmap.end()) { return pos->second; } else { return Hosts(); } } std::string host_id_; std::string ip_; }; NetRecords net_records_; }; #endif // end of include guard: NODE_CENTER_KY67RJ1Q