From 993c556000a414011626770540678948f16eaa9e Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 02 六月 2021 17:40:50 +0800
Subject: [PATCH] center restart with new shm; set center node ssn.

---
 box/node_center.h |   29 ++++++++++++++++++++---------
 1 files changed, 20 insertions(+), 9 deletions(-)

diff --git a/box/node_center.h b/box/node_center.h
index 4d3fba3..a085bdf 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -48,17 +48,16 @@
 class MsgRecords
 {
 	typedef int64_t MsgId;
-	typedef int64_t Offset;
 
 public:
-	void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); }
+	void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); }
 	void FreeMsg(MsgId id);
 	void AutoRemove();
 	size_t size() const { return msgs_.size(); }
 	void DebugPrint() const;
 
 private:
-	std::unordered_map<MsgId, Offset> msgs_;
+	std::unordered_map<MsgId, MsgI> msgs_;
 	int64_t time_to_clean_ = 0;
 };
 
@@ -85,13 +84,14 @@
 
 	struct NodeInfo {
 		NodeCenter &center_;
+		SharedMemory &shm_;
 		ProcState state_;               // state
 		std::map<MQId, int64_t> addrs_; // registered mqs
 		ProcInfo proc_;                 //
 		AddressTopics services_;        // address: topics
 		AddressTopics subscriptions_;   // address: topics
-		NodeInfo(NodeCenter &center) :
-		    center_(center) {}
+		NodeInfo(NodeCenter &center, SharedMemory &shm) :
+		    center_(center), shm_(shm) {}
 		void PutOffline(const int64_t offline_time);
 		void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
 	};
@@ -112,8 +112,8 @@
 public:
 	typedef std::set<TopicDest> Clients;
 
-	NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) :
-	    id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
+	NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) :
+	    id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
 
 	// center name, no relative to shm.
 	const std::string &id() const { return id_; }
@@ -121,6 +121,8 @@
 	void RecordMsg(const MsgI &msg);
 	bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
 	bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
+	bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
+	bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
 	void OnAlloc(ShmSocket &socket, const int64_t val);
 	void OnFree(ShmSocket &socket, const int64_t val);
 	bool OnCommand(ShmSocket &socket, const int64_t val);
@@ -157,6 +159,14 @@
 	{
 		return HandleMsg<MsgCommonReply, Func>(head, op);
 	}
+	template <class Reply>
+	bool CheckMsg(const BHMsgHead &head, Reply &reply)
+	{
+		bool r = false;
+		auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); };
+		reply = HandleMsg<Reply>(head, onOk);
+		return r;
+	}
 
 	MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg);
 	MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
@@ -174,7 +184,7 @@
 private:
 	void CheckNodes();
 	bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
-	void Publish(const Topic &topic, const std::string &content);
+	void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
 	bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
 	bool Valid(const WeakNode &weak)
 	{
@@ -182,6 +192,8 @@
 		return node && Valid(*node);
 	}
 	void RemoveNode(Node &node);
+	Node GetNode(const MQId mq);
+
 	std::string id_; // center proc id;
 
 	std::unordered_map<Topic, Clients> service_map_;
@@ -191,7 +203,6 @@
 	ProcRecords procs_; // To get a short index for msg alloc.
 	MsgRecords msgs_;   // record all msgs alloced.
 
-	Cleaner cleaner_; // remove mqs.
 	int64_t offline_time_;
 	int64_t kill_time_;
 	int64_t last_check_time_;

--
Gitblit v1.8.0