| | |
| | | }; |
| | | typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; |
| | | |
| | | struct NodeInfo; |
| | | typedef std::shared_ptr<NodeInfo> Node; |
| | | typedef std::weak_ptr<NodeInfo> WeakNode; |
| | | |
| | | struct NodeInfo { |
| | | NodeCenter ¢er_; |
| | | SharedMemory &shm_; |
| | |
| | | std::map<MQId, int64_t> addrs_; // registered mqs |
| | | ProcInfo proc_; // |
| | | AddressTopics services_; // address: topics |
| | | AddressTopics subscriptions_; // address: topics |
| | | AddressTopics local_sub_; // address: topics |
| | | AddressTopics net_sub_; // address: topics |
| | | NodeInfo(NodeCenter ¢er, SharedMemory &shm) : |
| | | center_(center), shm_(shm) {} |
| | | void PutOffline(const int64_t offline_time); |
| | | void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); |
| | | void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node); |
| | | void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node); |
| | | }; |
| | | typedef std::shared_ptr<NodeInfo> Node; |
| | | typedef std::weak_ptr<NodeInfo> WeakNode; |
| | | |
| | | struct TopicDest { |
| | | MQId mq_id_; |
| | |
| | | void RecordMsg(const MsgI &msg); |
| | | bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); |
| | | bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); |
| | | |
| | | bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); |
| | | bool RemotePublish(BHMsgHead &head, const std::string &body_content); |
| | | bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content); |
| | | void OnAlloc(ShmSocket &socket, const int64_t val); |
| | | void OnFree(ShmSocket &socket, const int64_t val); |
| | |
| | | MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req); |
| | | MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg); |
| | | MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg); |
| | | Clients DoFindClients(const std::string &topic); |
| | | bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply); |
| | | MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg); |
| | | |
| | | void OnTimer(); |
| | | |
| | | // remote hosts records |
| | | std::vector<std::string> FindRemoteSubClients(const Topic &topic); |
| | | |
| | | private: |
| | | void CheckNodes(); |
| | | bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } |
| | | void Publish(SharedMemory &shm, const Topic &topic, const std::string &content); |
| | | void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg); |
| | | Clients DoFindClients(const std::string &topic, bool from_remote); |
| | | bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } |
| | | bool Valid(const WeakNode &weak) |
| | | { |
| | |
| | | std::string id_; // center proc id; |
| | | |
| | | std::unordered_map<Topic, Clients> service_map_; |
| | | std::unordered_map<Topic, Clients> subscribe_map_; |
| | | std::unordered_map<Topic, Clients> local_sub_map_; |
| | | std::unordered_map<Topic, Clients> net_sub_map_; |
| | | std::unordered_map<Address, Node> nodes_; |
| | | std::unordered_map<ProcId, Address> online_node_addr_map_; |
| | | ProcRecords procs_; // To get a short index for msg alloc. |