| | |
| | | class MsgRecords |
| | | { |
| | | typedef int64_t MsgId; |
| | | typedef int64_t Offset; |
| | | |
| | | public: |
| | | void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); } |
| | | void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); } |
| | | void FreeMsg(MsgId id); |
| | | void AutoRemove(); |
| | | size_t size() const { return msgs_.size(); } |
| | | void DebugPrint() const; |
| | | |
| | | private: |
| | | std::unordered_map<MsgId, Offset> msgs_; |
| | | std::unordered_map<MsgId, MsgI> msgs_; |
| | | int64_t time_to_clean_ = 0; |
| | | }; |
| | | |
| | |
| | | struct ProcState { |
| | | int64_t timestamp_ = 0; |
| | | uint32_t flag_ = 0; // reserved |
| | | void PutOffline(const int64_t offline_time); |
| | | void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); |
| | | }; |
| | | typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; |
| | | |
| | | struct NodeInfo { |
| | | NodeCenter ¢er_; |
| | | SharedMemory &shm_; |
| | | ProcState state_; // state |
| | | std::map<MQId, int64_t> addrs_; // registered mqs |
| | | ProcInfo proc_; // |
| | | AddressTopics services_; // address: topics |
| | | AddressTopics subscriptions_; // address: topics |
| | | NodeInfo(NodeCenter ¢er, SharedMemory &shm) : |
| | | center_(center), shm_(shm) {} |
| | | void PutOffline(const int64_t offline_time); |
| | | void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); |
| | | }; |
| | | typedef std::shared_ptr<NodeInfo> Node; |
| | | typedef std::weak_ptr<NodeInfo> WeakNode; |
| | |
| | | public: |
| | | typedef std::set<TopicDest> Clients; |
| | | |
| | | NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) : |
| | | id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {} |
| | | NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) : |
| | | id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {} |
| | | |
| | | // center name, no relative to shm. |
| | | const std::string &id() const { return id_; } |
| | |
| | | void RecordMsg(const MsgI &msg); |
| | | bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); |
| | | bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); |
| | | bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); |
| | | bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content); |
| | | void OnAlloc(ShmSocket &socket, const int64_t val); |
| | | void OnFree(ShmSocket &socket, const int64_t val); |
| | | bool OnCommand(ShmSocket &socket, const int64_t val); |
| | |
| | | { |
| | | return HandleMsg<MsgCommonReply, Func>(head, op); |
| | | } |
| | | template <class Reply> |
| | | bool CheckMsg(const BHMsgHead &head, Reply &reply) |
| | | { |
| | | bool r = false; |
| | | auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); }; |
| | | reply = HandleMsg<Reply>(head, onOk); |
| | | return r; |
| | | } |
| | | |
| | | MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg); |
| | | MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg); |
| | |
| | | private: |
| | | void CheckNodes(); |
| | | bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } |
| | | void Publish(SharedMemory &shm, const Topic &topic, const std::string &content); |
| | | bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } |
| | | bool Valid(const WeakNode &weak) |
| | | { |
| | |
| | | return node && Valid(*node); |
| | | } |
| | | void RemoveNode(Node &node); |
| | | Node GetNode(const MQId mq); |
| | | |
| | | std::string id_; // center proc id; |
| | | |
| | | std::unordered_map<Topic, Clients> service_map_; |
| | |
| | | ProcRecords procs_; // To get a short index for msg alloc. |
| | | MsgRecords msgs_; // record all msgs alloced. |
| | | |
| | | Cleaner cleaner_; // remove mqs. |
| | | int64_t offline_time_; |
| | | int64_t kill_time_; |
| | | int64_t last_check_time_; |