From 65ef4d68321e56906920be75831b5e968f7abd7b Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 13 四月 2021 09:34:05 +0800 Subject: [PATCH] add heartbeat; refactor. --- .vscode/launch.json | 2 src/topic_node.h | 7 + utest/utest.cpp | 33 ++++++++++ src/failed_msg.cpp | 1 src/shm_queue.cpp | 14 ++++ src/topic_node.cpp | 43 ++++++++++++-- src/center.cpp | 40 +++++++++--- 7 files changed, 117 insertions(+), 23 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 12aa21d..b4e9631 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -11,7 +11,7 @@ "program": "${workspaceFolder}/debug/bin/utest", "args": [ "-t", - "SRTest" + "HeartbeatTest" ], "stopAtEntry": false, "cwd": "${workspaceFolder}", diff --git a/src/center.cpp b/src/center.cpp index 7865e57..cde865f 100644 --- a/src/center.cpp +++ b/src/center.cpp @@ -34,7 +34,9 @@ namespace { typedef steady_clock::time_point TimePoint; +typedef steady_clock::duration Duration; inline TimePoint Now() { return steady_clock::now(); }; +inline int64_t Seconds(const Duration &d) { return duration_cast<seconds>(d).count(); }; //TODO check proc_id class NodeCenter @@ -56,15 +58,15 @@ struct ProcState { TimePoint timestamp_; uint32_t flag_ = 0; // reserved - void UpdateState(TimePoint now) + void UpdateState(TimePoint now, const Duration &offline_time, const Duration &kill_time) { - const auto kOfflineTime = 60 * 10s; - const auto kKillTime = 60 * 20s; - auto diff = now - timestamp_; - if (diff < kOfflineTime) { +#ifndef NDEBUG + printf("diff: %ld\n", Seconds(diff)); +#endif + if (diff < offline_time) { flag_ = kStateNormal; - } else if (diff < kKillTime) { + } else if (diff < kill_time) { flag_ = kStateOffline; } else { flag_ = kStateKillme; @@ -94,8 +96,8 @@ public: typedef std::set<TopicDest> Clients; - NodeCenter(const std::string &id, const Cleaner &cleaner) : - id_(id), cleaner_(cleaner) {} + NodeCenter(const std::string &id, const Cleaner &cleaner, const Duration &offline_time, const Duration &kill_time) : + id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(Now()) {} const std::string &id() const { return id_; } // no need to lock. //TODO maybe just return serialized string. @@ -132,6 +134,8 @@ auto node = pos->second; if (!MatchAddr(node->addrs_, SrcAddr(head))) { return MakeReply<Reply>(eAddressNotMatch, "Node address error."); + } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) { + return op(node); } else if (!Valid(*node)) { return MakeReply<Reply>(eNoRespond, "Node is not alive."); } else { @@ -168,7 +172,9 @@ { return HandleMsg(head, [&](Node node) { NodeInfo &ni = *node; - ni.state_.timestamp_ = Now(); + auto now = Now(); + ni.state_.timestamp_ = now; + ni.state_.flag_ = kStateNormal; auto &info = msg.proc(); if (!info.public_info().empty()) { @@ -301,10 +307,15 @@ private: void CheckNodes() { + auto now = Now(); + if (Seconds(now - last_check_time_) < 1) { return; } + + last_check_time_ = now; + auto it = nodes_.begin(); while (it != nodes_.end()) { auto &cli = *it->second; - cli.state_.UpdateState(Now()); + cli.state_.UpdateState(now, offline_time_, kill_time_); if (cli.state_.flag_ == kStateKillme) { if (cleaner_) { for (auto &addr : cli.addrs_) { @@ -316,6 +327,10 @@ ++it; } } + } + bool CanHeartbeat(const NodeInfo &node) + { + return Valid(node) || node.state_.flag_ == kStateOffline; } bool Valid(const NodeInfo &node) { @@ -333,6 +348,9 @@ std::unordered_map<Topic, Clients> subscribe_map_; std::unordered_map<ProcId, Node> nodes_; Cleaner cleaner_; // remove mqs. + Duration offline_time_; + Duration kill_time_; + TimePoint last_check_time_; }; template <class Body, class OnMsg, class Replyer> @@ -365,7 +383,7 @@ bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) { - auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner); + auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 3); auto center_failed_q = std::make_shared<FailedMsgQ>(); auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) { return [&](auto &&rep_body) { diff --git a/src/failed_msg.cpp b/src/failed_msg.cpp index ab4658d..0b4c443 100644 --- a/src/failed_msg.cpp +++ b/src/failed_msg.cpp @@ -24,6 +24,7 @@ assert(valid_sock); ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock); bool r = sock.Send(remote.data(), msg, 0); + //TODO check remote removed. if (r && msg.IsCounted()) { auto tmp = msg; // Release() is not const, but it's safe to release. tmp.Release(sock.shm()); diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp index 652ed5b..8e4e56e 100644 --- a/src/shm_queue.cpp +++ b/src/shm_queue.cpp @@ -72,12 +72,22 @@ bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend) { Queue *remote = Find(shm, MsgQIdToName(remote_id)); - return remote && remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); + if (remote) { + return remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); + } else { + // SetLestError(eNotFound); + return false; + } } bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms) { Queue *remote = Find(shm, MsgQIdToName(remote_id)); - return remote && remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); }); + if (remote) { + return remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); }); + } else { + // SetLestError(eNotFound); + return false; + } } // Test shows that in the 2 cases: diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 8cd5cc4..788c536 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -39,17 +39,21 @@ TopicNode::TopicNode(SharedMemory &shm) : shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) { - SockNode().Start(); - SockClient().Start(); - SockServer().Start(); + Start(); } TopicNode::~TopicNode() { - StopAll(); + Stop(); } -void TopicNode::StopAll() +void TopicNode::Start() +{ + SockNode().Start(); + SockClient().Start(); + SockServer().Start(); +} +void TopicNode::Stop() { SockServer().Stop(); SockClient().Stop(); @@ -76,12 +80,39 @@ BHMsgHead reply_head; bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); - if (r) { + if (r && IsSuccess(reply_body.errmsg().errcode())) { info_ = body; } return r; } +bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) +{ + auto &sock = SockNode(); + MsgHeartbeat body; + *body.mutable_proc() = proc; + + auto head(InitMsgHead(GetType(body), body.proc().proc_id())); + AddRoute(head, sock.id()); + + MsgI reply; + DEFER1(reply.Release(shm_);); + BHMsgHead reply_head; + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); + if (r && IsSuccess(reply_body.errmsg().errcode())) { + // TODO update proc info + } + return r; +} +bool TopicNode::Heartbeat(const int timeout_ms) +{ + ProcInfo proc; + proc.set_proc_id(proc_id()); + MsgCommonReply reply_body; + return Heartbeat(proc, reply_body, timeout_ms) && IsSuccess(reply_body.errmsg().errcode()); +} + bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { //TODO check registered diff --git a/src/topic_node.h b/src/topic_node.h index 60497ad..d2cdcf9 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -37,9 +37,12 @@ TopicNode(SharedMemory &shm); ~TopicNode(); - void StopAll(); + void Start(); + void Stop(); // topic node - bool Register(ProcInfo &body, MsgCommonReply &reply, const int timeout_ms); + bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); + bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); + bool Heartbeat(const int timeout_ms); // topic rpc server typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; diff --git a/utest/utest.cpp b/utest/utest.cpp index e0a9023..f127a8f 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -240,7 +240,7 @@ do { std::this_thread::yield(); } while (count.load() < nreq); - client.StopAll(); + client.Stop(); printf("request %s %d done ", topic.c_str(), count.load()); }; @@ -282,6 +282,37 @@ servers.WaitAll(); } +BOOST_AUTO_TEST_CASE(HeartbeatTest) +{ + const std::string shm_name("ShmHeartbeat"); + ShmRemover auto_remove(shm_name); + SharedMemory shm(shm_name, 1024 * 1024 * 50); + + BHCenter center(shm); + center.Start(); + + { + + DemoNode node("demo_node", shm); + auto Check = [&]() { + bool r = node.Heartbeat(100); + printf("hearbeat ret : %s\n", r ? "ok" : "failed"); + }; + Check(); + for (int i = 0; i < 3; ++i) { + std::this_thread::sleep_for(1s); + Check(); + } + printf("sleep 4\n"); + std::this_thread::sleep_for(4s); + for (int i = 0; i < 2; ++i) { + std::this_thread::sleep_for(1s); + Check(); + } + } + printf("sleep 8\n"); + std::this_thread::sleep_for(8s); +} inline int MyMin(int a, int b) { printf("MyMin\n"); -- Gitblit v1.8.0