From 330f78f3334bcdcdb4cc2ab2dbf66604e0224d71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 16:21:45 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.5.5:10010/r/valib/bhshmq

---
 box/node_center.h |   19 ++++++++++++-------
 1 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/box/node_center.h b/box/node_center.h
index b9a01b3..caaf054 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -51,14 +51,14 @@
 	typedef int64_t Offset;
 
 public:
-	void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); }
+	void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); }
 	void FreeMsg(MsgId id);
 	void AutoRemove();
 	size_t size() const { return msgs_.size(); }
 	void DebugPrint() const;
 
 private:
-	std::unordered_map<MsgId, Offset> msgs_;
+	std::unordered_map<MsgId, MsgI> msgs_;
 	int64_t time_to_clean_ = 0;
 };
 
@@ -80,17 +80,21 @@
 	struct ProcState {
 		int64_t timestamp_ = 0;
 		uint32_t flag_ = 0; // reserved
-		void PutOffline(const int64_t offline_time);
-		void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
 	};
 	typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
 
 	struct NodeInfo {
+		NodeCenter &center_;
+		SharedMemory &shm_;
 		ProcState state_;               // state
 		std::map<MQId, int64_t> addrs_; // registered mqs
 		ProcInfo proc_;                 //
 		AddressTopics services_;        // address: topics
 		AddressTopics subscriptions_;   // address: topics
+		NodeInfo(NodeCenter &center, SharedMemory &shm) :
+		    center_(center), shm_(shm) {}
+		void PutOffline(const int64_t offline_time);
+		void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
 	};
 	typedef std::shared_ptr<NodeInfo> Node;
 	typedef std::weak_ptr<NodeInfo> WeakNode;
@@ -109,8 +113,8 @@
 public:
 	typedef std::set<TopicDest> Clients;
 
-	NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) :
-	    id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
+	NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) :
+	    id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
 
 	// center name, no relative to shm.
 	const std::string &id() const { return id_; }
@@ -159,6 +163,7 @@
 	MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
 	MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg);
 	MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req);
+	MsgQueryProcReply QueryProc(const std::string &proc_id);
 	MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
 	MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
 	MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
@@ -170,6 +175,7 @@
 private:
 	void CheckNodes();
 	bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
+	void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
 	bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
 	bool Valid(const WeakNode &weak)
 	{
@@ -186,7 +192,6 @@
 	ProcRecords procs_; // To get a short index for msg alloc.
 	MsgRecords msgs_;   // record all msgs alloced.
 
-	Cleaner cleaner_; // remove mqs.
 	int64_t offline_time_;
 	int64_t kill_time_;
 	int64_t last_check_time_;

--
Gitblit v1.8.0