| | |
| | | class MsgRecords |
| | | { |
| | | typedef int64_t MsgId; |
| | | typedef int64_t Offset; |
| | | |
| | | public: |
| | | void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); } |
| | | void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); } |
| | | void FreeMsg(MsgId id); |
| | | void AutoRemove(); |
| | | size_t size() const { return msgs_.size(); } |
| | | void DebugPrint() const; |
| | | |
| | | private: |
| | | std::unordered_map<MsgId, Offset> msgs_; |
| | | std::unordered_map<MsgId, MsgI> msgs_; |
| | | int64_t time_to_clean_ = 0; |
| | | }; |
| | | |
| | |
| | | }; |
| | | typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; |
| | | |
| | | struct NodeInfo; |
| | | typedef std::shared_ptr<NodeInfo> Node; |
| | | typedef std::weak_ptr<NodeInfo> WeakNode; |
| | | |
| | | struct NodeInfo { |
| | | NodeCenter ¢er_; |
| | | SharedMemory &shm_; |
| | | ProcState state_; // state |
| | | std::map<MQId, int64_t> addrs_; // registered mqs |
| | | ProcInfo proc_; // |
| | | AddressTopics services_; // address: topics |
| | | AddressTopics subscriptions_; // address: topics |
| | | NodeInfo(NodeCenter ¢er) : |
| | | center_(center) {} |
| | | AddressTopics local_sub_; // address: topics |
| | | AddressTopics net_sub_; // address: topics |
| | | NodeInfo(NodeCenter ¢er, SharedMemory &shm) : |
| | | center_(center), shm_(shm) {} |
| | | void PutOffline(const int64_t offline_time); |
| | | void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); |
| | | void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node); |
| | | void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node); |
| | | }; |
| | | typedef std::shared_ptr<NodeInfo> Node; |
| | | typedef std::weak_ptr<NodeInfo> WeakNode; |
| | | |
| | | struct TopicDest { |
| | | MQId mq_id_; |
| | |
| | | public: |
| | | typedef std::set<TopicDest> Clients; |
| | | |
| | | NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) : |
| | | id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {} |
| | | NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) : |
| | | id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {} |
| | | |
| | | // center name, no relative to shm. |
| | | const std::string &id() const { return id_; } |
| | |
| | | void RecordMsg(const MsgI &msg); |
| | | bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); |
| | | bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); |
| | | |
| | | bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); |
| | | bool RemotePublish(BHMsgHead &head, const std::string &body_content); |
| | | bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content); |
| | | void OnAlloc(ShmSocket &socket, const int64_t val); |
| | | void OnFree(ShmSocket &socket, const int64_t val); |
| | | bool OnCommand(ShmSocket &socket, const int64_t val); |
| | |
| | | return op(node); |
| | | } |
| | | } |
| | | } catch (...) { |
| | | //TODO error log |
| | | } catch (std::exception &e) { |
| | | LOG_ERROR() << "handle msg exception: " << e.what(); |
| | | return MakeReply<Reply>(eError, "internal error."); |
| | | } |
| | | } |
| | |
| | | inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op) |
| | | { |
| | | return HandleMsg<MsgCommonReply, Func>(head, op); |
| | | } |
| | | template <class Reply> |
| | | bool CheckMsg(const BHMsgHead &head, Reply &reply) |
| | | { |
| | | bool r = false; |
| | | auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); }; |
| | | reply = HandleMsg<Reply>(head, onOk); |
| | | return r; |
| | | } |
| | | |
| | | MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg); |
| | |
| | | MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req); |
| | | MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg); |
| | | MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg); |
| | | Clients DoFindClients(const std::string &topic); |
| | | bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply); |
| | | MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg); |
| | | |
| | | void OnTimer(); |
| | | |
| | | // remote hosts records |
| | | std::vector<std::string> FindRemoteSubClients(const Topic &topic); |
| | | |
| | | private: |
| | | void CheckNodes(); |
| | | bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } |
| | | void Publish(const Topic &topic, const std::string &content); |
| | | void Publish(SharedMemory &shm, const Topic &topic, const std::string &content); |
| | | void Notify(const Topic &topic, NodeInfo &node); |
| | | void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg); |
| | | Clients DoFindClients(const std::string &topic, bool from_remote); |
| | | bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } |
| | | bool Valid(const WeakNode &weak) |
| | | { |
| | |
| | | return node && Valid(*node); |
| | | } |
| | | void RemoveNode(Node &node); |
| | | Node GetNode(const MQId mq); |
| | | |
| | | std::string id_; // center proc id; |
| | | |
| | | std::unordered_map<Topic, Clients> service_map_; |
| | | std::unordered_map<Topic, Clients> subscribe_map_; |
| | | std::unordered_map<Topic, Clients> local_sub_map_; |
| | | std::unordered_map<Topic, Clients> net_sub_map_; |
| | | std::unordered_map<Address, Node> nodes_; |
| | | std::unordered_map<ProcId, Address> online_node_addr_map_; |
| | | ProcRecords procs_; // To get a short index for msg alloc. |
| | | MsgRecords msgs_; // record all msgs alloced. |
| | | |
| | | Cleaner cleaner_; // remove mqs. |
| | | int64_t offline_time_; |
| | | int64_t kill_time_; |
| | | int64_t last_check_time_; |