From 993c556000a414011626770540678948f16eaa9e Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 02 六月 2021 17:40:50 +0800 Subject: [PATCH] center restart with new shm; set center node ssn. --- box/node_center.h | 31 +++++++++++++++++++++++-------- 1 files changed, 23 insertions(+), 8 deletions(-) diff --git a/box/node_center.h b/box/node_center.h index 4663bee..a085bdf 100644 --- a/box/node_center.h +++ b/box/node_center.h @@ -48,17 +48,16 @@ class MsgRecords { typedef int64_t MsgId; - typedef int64_t Offset; public: - void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); } + void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); } void FreeMsg(MsgId id); void AutoRemove(); size_t size() const { return msgs_.size(); } void DebugPrint() const; private: - std::unordered_map<MsgId, Offset> msgs_; + std::unordered_map<MsgId, MsgI> msgs_; int64_t time_to_clean_ = 0; }; @@ -80,17 +79,21 @@ struct ProcState { int64_t timestamp_ = 0; uint32_t flag_ = 0; // reserved - void PutOffline(const int64_t offline_time); - void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); }; typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; struct NodeInfo { + NodeCenter ¢er_; + SharedMemory &shm_; ProcState state_; // state std::map<MQId, int64_t> addrs_; // registered mqs ProcInfo proc_; // AddressTopics services_; // address: topics AddressTopics subscriptions_; // address: topics + NodeInfo(NodeCenter ¢er, SharedMemory &shm) : + center_(center), shm_(shm) {} + void PutOffline(const int64_t offline_time); + void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); }; typedef std::shared_ptr<NodeInfo> Node; typedef std::weak_ptr<NodeInfo> WeakNode; @@ -109,8 +112,8 @@ public: typedef std::set<TopicDest> Clients; - NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) : - id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {} + NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) : + id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {} // center name, no relative to shm. const std::string &id() const { return id_; } @@ -118,6 +121,8 @@ void RecordMsg(const MsgI &msg); bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); + bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); + bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content); void OnAlloc(ShmSocket &socket, const int64_t val); void OnFree(ShmSocket &socket, const int64_t val); bool OnCommand(ShmSocket &socket, const int64_t val); @@ -154,6 +159,14 @@ { return HandleMsg<MsgCommonReply, Func>(head, op); } + template <class Reply> + bool CheckMsg(const BHMsgHead &head, Reply &reply) + { + bool r = false; + auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); }; + reply = HandleMsg<Reply>(head, onOk); + return r; + } MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg); MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg); @@ -171,6 +184,7 @@ private: void CheckNodes(); bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } + void Publish(SharedMemory &shm, const Topic &topic, const std::string &content); bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } bool Valid(const WeakNode &weak) { @@ -178,6 +192,8 @@ return node && Valid(*node); } void RemoveNode(Node &node); + Node GetNode(const MQId mq); + std::string id_; // center proc id; std::unordered_map<Topic, Clients> service_map_; @@ -187,7 +203,6 @@ ProcRecords procs_; // To get a short index for msg alloc. MsgRecords msgs_; // record all msgs alloced. - Cleaner cleaner_; // remove mqs. int64_t offline_time_; int64_t kill_time_; int64_t last_check_time_; -- Gitblit v1.8.0