From 65ef4d68321e56906920be75831b5e968f7abd7b Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 13 四月 2021 09:34:05 +0800
Subject: [PATCH] add heartbeat; refactor.

---
 src/center.cpp |   40 +++++++++++++++++++++++++++++-----------
 1 files changed, 29 insertions(+), 11 deletions(-)

diff --git a/src/center.cpp b/src/center.cpp
index 7865e57..cde865f 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -34,7 +34,9 @@
 namespace
 {
 typedef steady_clock::time_point TimePoint;
+typedef steady_clock::duration Duration;
 inline TimePoint Now() { return steady_clock::now(); };
+inline int64_t Seconds(const Duration &d) { return duration_cast<seconds>(d).count(); };
 
 //TODO check proc_id
 class NodeCenter
@@ -56,15 +58,15 @@
 	struct ProcState {
 		TimePoint timestamp_;
 		uint32_t flag_ = 0; // reserved
-		void UpdateState(TimePoint now)
+		void UpdateState(TimePoint now, const Duration &offline_time, const Duration &kill_time)
 		{
-			const auto kOfflineTime = 60 * 10s;
-			const auto kKillTime = 60 * 20s;
-
 			auto diff = now - timestamp_;
-			if (diff < kOfflineTime) {
+#ifndef NDEBUG
+			printf("diff: %ld\n", Seconds(diff));
+#endif
+			if (diff < offline_time) {
 				flag_ = kStateNormal;
-			} else if (diff < kKillTime) {
+			} else if (diff < kill_time) {
 				flag_ = kStateOffline;
 			} else {
 				flag_ = kStateKillme;
@@ -94,8 +96,8 @@
 public:
 	typedef std::set<TopicDest> Clients;
 
-	NodeCenter(const std::string &id, const Cleaner &cleaner) :
-	    id_(id), cleaner_(cleaner) {}
+	NodeCenter(const std::string &id, const Cleaner &cleaner, const Duration &offline_time, const Duration &kill_time) :
+	    id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(Now()) {}
 	const std::string &id() const { return id_; } // no need to lock.
 
 	//TODO maybe just return serialized string.
@@ -132,6 +134,8 @@
 				auto node = pos->second;
 				if (!MatchAddr(node->addrs_, SrcAddr(head))) {
 					return MakeReply<Reply>(eAddressNotMatch, "Node address error.");
+				} else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) {
+					return op(node);
 				} else if (!Valid(*node)) {
 					return MakeReply<Reply>(eNoRespond, "Node is not alive.");
 				} else {
@@ -168,7 +172,9 @@
 	{
 		return HandleMsg(head, [&](Node node) {
 			NodeInfo &ni = *node;
-			ni.state_.timestamp_ = Now();
+			auto now = Now();
+			ni.state_.timestamp_ = now;
+			ni.state_.flag_ = kStateNormal;
 
 			auto &info = msg.proc();
 			if (!info.public_info().empty()) {
@@ -301,10 +307,15 @@
 private:
 	void CheckNodes()
 	{
+		auto now = Now();
+		if (Seconds(now - last_check_time_) < 1) { return; }
+
+		last_check_time_ = now;
+
 		auto it = nodes_.begin();
 		while (it != nodes_.end()) {
 			auto &cli = *it->second;
-			cli.state_.UpdateState(Now());
+			cli.state_.UpdateState(now, offline_time_, kill_time_);
 			if (cli.state_.flag_ == kStateKillme) {
 				if (cleaner_) {
 					for (auto &addr : cli.addrs_) {
@@ -316,6 +327,10 @@
 				++it;
 			}
 		}
+	}
+	bool CanHeartbeat(const NodeInfo &node)
+	{
+		return Valid(node) || node.state_.flag_ == kStateOffline;
 	}
 	bool Valid(const NodeInfo &node)
 	{
@@ -333,6 +348,9 @@
 	std::unordered_map<Topic, Clients> subscribe_map_;
 	std::unordered_map<ProcId, Node> nodes_;
 	Cleaner cleaner_; // remove mqs.
+	Duration offline_time_;
+	Duration kill_time_;
+	TimePoint last_check_time_;
 };
 
 template <class Body, class OnMsg, class Replyer>
@@ -365,7 +383,7 @@
 
 bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
 {
-	auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner);
+	auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 3);
 	auto center_failed_q = std::make_shared<FailedMsgQ>();
 	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) {
 		return [&](auto &&rep_body) {

--
Gitblit v1.8.0