From f286fba4fa0c861ee8a40fb64aaa2650f16110fe Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 12 四月 2021 09:32:14 +0800
Subject: [PATCH] vscode exclude build,debug dir.

---
 src/center.cpp |  137 ++++++++++++++++++++++++++++++---------------
 1 files changed, 91 insertions(+), 46 deletions(-)

diff --git a/src/center.cpp b/src/center.cpp
index 71c85c3..d2aad0a 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -19,7 +19,11 @@
 #include "bh_util.h"
 #include "defs.h"
 #include "shm.h"
+#include <chrono>
 #include <set>
+
+using namespace std::chrono;
+using namespace std::chrono_literals;
 
 using namespace bhome_shm;
 using namespace bhome_msg;
@@ -28,7 +32,8 @@
 
 namespace
 {
-auto Now = []() { time_t t; return time(&t); };
+typedef steady_clock::time_point TimePoint;
+inline TimePoint Now() { return steady_clock::now(); };
 
 //TODO check proc_id
 class NodeCenter
@@ -37,24 +42,39 @@
 	typedef std::string ProcId;
 	typedef std::string Address;
 	typedef bhome::msg::ProcInfo ProcInfo;
+	typedef std::function<void(Address const &)> Cleaner;
 
 private:
 	enum {
-		kStateInvalid = 0,
-		kStateNormal = 1,
-		kStateNoRespond = 2,
-		kStateOffline = 3,
+		kStateInvalid,
+		kStateNormal,
+		kStateOffline,
+		kStateKillme,
 	};
 
 	struct ProcState {
-		time_t timestamp_ = 0;
+		TimePoint timestamp_;
 		uint32_t flag_ = 0; // reserved
+		void UpdateState(TimePoint now)
+		{
+			const auto kOfflineTime = 60 * 10s;
+			const auto kKillTime = 60 * 20s;
+
+			auto diff = now - timestamp_;
+			if (diff < kOfflineTime) {
+				flag_ = kStateNormal;
+			} else if (diff < kKillTime) {
+				flag_ = kStateOffline;
+			} else {
+				flag_ = kStateKillme;
+			}
+		}
 	};
 	typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
 
 	struct NodeInfo {
 		ProcState state_;             // state
-		Address addr_;                // registered_mqid.
+		std::set<Address> addrs_;     // registered mqs
 		ProcInfo proc_;               //
 		AddressTopics services_;      // address: topics
 		AddressTopics subscriptions_; // address: topics
@@ -67,13 +87,14 @@
 		WeakNode weak_node_;
 		bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
 	};
-	const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
+	inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
+	inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
 
 public:
 	typedef std::set<TopicDest> Clients;
 
-	NodeCenter(const std::string &id = "#Center") :
-	    id_(id) {}
+	NodeCenter(const std::string &id, const Cleaner &cleaner) :
+	    id_(id), cleaner_(cleaner) {}
 	const std::string &id() const { return id_; } // no need to lock.
 
 	//TODO maybe just return serialized string.
@@ -85,7 +106,10 @@
 
 		try {
 			Node node(new NodeInfo);
-			node->addr_ = SrcAddr(head);
+			node->addrs_.insert(SrcAddr(head));
+			for (auto &addr : msg.addrs()) {
+				node->addrs_.insert(addr.mq_id());
+			}
 			node->proc_.Swap(msg.mutable_proc());
 			node->state_.timestamp_ = Now();
 			node->state_.flag_ = kStateNormal;
@@ -95,37 +119,17 @@
 			return MakeReply(eError, "register node error.");
 		}
 	}
-	template <class OnSuccess, class OnError>
-	auto HandleMsg(const BHMsgHead &head, OnSuccess onOk, OnError onErr)
-	{
-		auto pos = nodes_.find(head.proc_id());
-		if (pos == nodes_.end()) {
-			return onErr(eNotRegistered, "Node is not registered.");
-		} else {
-			auto node = pos->second;
-			if (head.type() == kMsgTypeHeartbeat && node->addr_ != SrcAddr(head)) {
-				return onErr(eAddressNotMatch, "Node address error.");
-			} else if (!Valid(*node)) {
-				return onErr(eNoRespond, "Node is not alive.");
-			} else {
-				return onOk(node);
-			}
-		}
-	}
 
 	template <class Reply, class Func>
 	Reply HandleMsg(const BHMsgHead &head, Func const &op)
 	{
 		try {
-			auto onErr = [](const ErrorCode ec, const std::string &str) { return MakeReply<Reply>(ec, str); };
-			return HandleMsg(head, op, onErr);
-
 			auto pos = nodes_.find(head.proc_id());
 			if (pos == nodes_.end()) {
 				return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
 			} else {
 				auto node = pos->second;
-				if (node->addr_ != SrcAddr(head)) {
+				if (!MatchAddr(node->addrs_, SrcAddr(head))) {
 					return MakeReply<Reply>(eAddressNotMatch, "Node address error.");
 				} else if (!Valid(*node)) {
 					return MakeReply<Reply>(eNoRespond, "Node is not alive.");
@@ -149,9 +153,10 @@
 		return HandleMsg(
 		    head, [&](Node node) -> MsgCommonReply {
 			    auto &src = SrcAddr(head);
-			    node->services_[src].insert(msg.topics().begin(), msg.topics().end());
+			    auto &topics = msg.topics().topic_list();
+			    node->services_[src].insert(topics.begin(), topics.end());
 			    TopicDest dest = {src, node};
-			    for (auto &topic : msg.topics()) {
+			    for (auto &topic : topics) {
 				    service_map_[topic].insert(dest);
 			    }
 			    return MakeReply(eSuccess);
@@ -163,6 +168,7 @@
 		return HandleMsg(head, [&](Node node) {
 			NodeInfo &ni = *node;
 			ni.state_.timestamp_ = Now();
+
 			auto &info = msg.proc();
 			if (!info.public_info().empty()) {
 				ni.proc_.set_public_info(info.public_info());
@@ -207,9 +213,10 @@
 	{
 		return HandleMsg(head, [&](Node node) {
 			auto &src = SrcAddr(head);
-			node->subscriptions_[src].insert(msg.topics().begin(), msg.topics().end());
+			auto &topics = msg.topics().topic_list();
+			node->subscriptions_[src].insert(topics.begin(), topics.end());
 			TopicDest dest = {src, node};
-			for (auto &topic : msg.topics()) {
+			for (auto &topic : topics) {
 				subscribe_map_[topic].insert(dest);
 			}
 			return MakeReply(eSuccess);
@@ -232,8 +239,9 @@
 
 			if (pos != node->subscriptions_.end()) {
 				const TopicDest &dest = {src, node};
+				auto &topics = msg.topics().topic_list();
 				// clear node sub records;
-				for (auto &topic : msg.topics()) {
+				for (auto &topic : topics) {
 					pos->second.erase(topic);
 					RemoveSubTopicDestRecord(topic, dest);
 				}
@@ -284,7 +292,30 @@
 		return ret;
 	}
 
+	void OnTimer()
+	{
+		CheckNodes();
+	}
+
 private:
+	void CheckNodes()
+	{
+		auto it = nodes_.begin();
+		while (it != nodes_.end()) {
+			auto &cli = *it->second;
+			cli.state_.UpdateState(Now());
+			if (cli.state_.flag_ == kStateKillme) {
+				if (cleaner_) {
+					for (auto &addr : cli.addrs_) {
+						cleaner_(addr);
+					}
+				}
+				it = nodes_.erase(it);
+			} else {
+				++it;
+			}
+		}
+	}
 	bool Valid(const NodeInfo &node)
 	{
 		return node.state_.flag_ == kStateNormal;
@@ -300,6 +331,7 @@
 	std::unordered_map<Topic, Clients> service_map_;
 	std::unordered_map<Topic, Clients> subscribe_map_;
 	std::unordered_map<ProcId, Node> nodes_;
+	Cleaner cleaner_; // remove mqs.
 };
 
 template <class Body, class OnMsg, class Replyer>
@@ -330,9 +362,11 @@
 		    msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
 		return true;
 
-bool InstallCenter()
+bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
 {
-	auto center_ptr = std::make_shared<Synced<NodeCenter>>();
+
+	auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner);
+
 	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
 		return [&](auto &&rep_body) {
 			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
@@ -342,6 +376,11 @@
 			}
 			//TODO resend failed.
 		};
+	};
+
+	auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
+		auto &center = *center_ptr;
+		center->OnTimer();
 	};
 
 	auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
@@ -357,6 +396,7 @@
 		}
 	};
 
+	auto OnBusIdle = [](ShmSocket &socket) {};
 	auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
 		auto &center = *center_ptr;
 		auto replyer = MakeReplyer(socket, head, center->id());
@@ -390,8 +430,8 @@
 		}
 	};
 
-	BHCenter::Install("#center.reg", OnCenter, BHTopicCenterAddress(), 1000);
-	BHCenter::Install("#center.bus", OnPubSub, BHTopicBusAddress(), 1000);
+	BHCenter::Install("#center.reg", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000);
+	BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000);
 
 	return true;
 }
@@ -412,19 +452,24 @@
 	return rec;
 }
 
-bool BHCenter::Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len)
+bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len)
 {
-	Centers()[name] = CenterInfo{name, handler, mqid, mq_len};
+	Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len};
 	return true;
 }
-bool BHCenter::Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len)
+bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len)
 {
-	return Install(name, handler, std::string((const char *) &mqid, sizeof(mqid)), mq_len);
+	return Install(name, handler, idle, std::string((const char *) &mqid, sizeof(mqid)), mq_len);
 }
 
 BHCenter::BHCenter(Socket::Shm &shm)
 {
-	InstallCenter();
+	auto gc = [&](const std::string &id) {
+		auto r = ShmSocket::Remove(shm, *(MQId *) id.data());
+		printf("remove mq : %s\n", r ? "ok" : "failed");
+	};
+
+	AddCenter("#center", gc);
 
 	for (auto &kv : Centers()) {
 		auto &info = kv.second;

--
Gitblit v1.8.0