From 65ef4d68321e56906920be75831b5e968f7abd7b Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 13 四月 2021 09:34:05 +0800 Subject: [PATCH] add heartbeat; refactor. --- src/center.cpp | 40 +++++++++++++++++++++++++++++----------- 1 files changed, 29 insertions(+), 11 deletions(-) diff --git a/src/center.cpp b/src/center.cpp index 7865e57..cde865f 100644 --- a/src/center.cpp +++ b/src/center.cpp @@ -34,7 +34,9 @@ namespace { typedef steady_clock::time_point TimePoint; +typedef steady_clock::duration Duration; inline TimePoint Now() { return steady_clock::now(); }; +inline int64_t Seconds(const Duration &d) { return duration_cast<seconds>(d).count(); }; //TODO check proc_id class NodeCenter @@ -56,15 +58,15 @@ struct ProcState { TimePoint timestamp_; uint32_t flag_ = 0; // reserved - void UpdateState(TimePoint now) + void UpdateState(TimePoint now, const Duration &offline_time, const Duration &kill_time) { - const auto kOfflineTime = 60 * 10s; - const auto kKillTime = 60 * 20s; - auto diff = now - timestamp_; - if (diff < kOfflineTime) { +#ifndef NDEBUG + printf("diff: %ld\n", Seconds(diff)); +#endif + if (diff < offline_time) { flag_ = kStateNormal; - } else if (diff < kKillTime) { + } else if (diff < kill_time) { flag_ = kStateOffline; } else { flag_ = kStateKillme; @@ -94,8 +96,8 @@ public: typedef std::set<TopicDest> Clients; - NodeCenter(const std::string &id, const Cleaner &cleaner) : - id_(id), cleaner_(cleaner) {} + NodeCenter(const std::string &id, const Cleaner &cleaner, const Duration &offline_time, const Duration &kill_time) : + id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(Now()) {} const std::string &id() const { return id_; } // no need to lock. //TODO maybe just return serialized string. @@ -132,6 +134,8 @@ auto node = pos->second; if (!MatchAddr(node->addrs_, SrcAddr(head))) { return MakeReply<Reply>(eAddressNotMatch, "Node address error."); + } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) { + return op(node); } else if (!Valid(*node)) { return MakeReply<Reply>(eNoRespond, "Node is not alive."); } else { @@ -168,7 +172,9 @@ { return HandleMsg(head, [&](Node node) { NodeInfo &ni = *node; - ni.state_.timestamp_ = Now(); + auto now = Now(); + ni.state_.timestamp_ = now; + ni.state_.flag_ = kStateNormal; auto &info = msg.proc(); if (!info.public_info().empty()) { @@ -301,10 +307,15 @@ private: void CheckNodes() { + auto now = Now(); + if (Seconds(now - last_check_time_) < 1) { return; } + + last_check_time_ = now; + auto it = nodes_.begin(); while (it != nodes_.end()) { auto &cli = *it->second; - cli.state_.UpdateState(Now()); + cli.state_.UpdateState(now, offline_time_, kill_time_); if (cli.state_.flag_ == kStateKillme) { if (cleaner_) { for (auto &addr : cli.addrs_) { @@ -316,6 +327,10 @@ ++it; } } + } + bool CanHeartbeat(const NodeInfo &node) + { + return Valid(node) || node.state_.flag_ == kStateOffline; } bool Valid(const NodeInfo &node) { @@ -333,6 +348,9 @@ std::unordered_map<Topic, Clients> subscribe_map_; std::unordered_map<ProcId, Node> nodes_; Cleaner cleaner_; // remove mqs. + Duration offline_time_; + Duration kill_time_; + TimePoint last_check_time_; }; template <class Body, class OnMsg, class Replyer> @@ -365,7 +383,7 @@ bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) { - auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner); + auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 3); auto center_failed_q = std::make_shared<FailedMsgQ>(); auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) { return [&](auto &&rep_body) { -- Gitblit v1.8.0