From aa1542b6d6a4680088ac715c4ce40f97ada554fb Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 14 四月 2021 17:52:31 +0800 Subject: [PATCH] add SendQ TrySend() TryRecv(); handle re-register. --- box/center.cpp | 65 ++++++++++++++++++-------------- 1 files changed, 37 insertions(+), 28 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index f9044d4..0dd4ed4 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -18,7 +18,6 @@ #include "center.h" #include "bh_util.h" #include "defs.h" -#include "failed_msg.h" #include "shm.h" #include <chrono> #include <set> @@ -52,7 +51,7 @@ }; struct ProcState { - int64_t timestamp_; + int64_t timestamp_ = 0; uint32_t flag_ = 0; // reserved void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) { @@ -111,15 +110,32 @@ } try { - Node node(new NodeInfo); - node->addrs_.insert(SrcAddr(head)); - for (auto &addr : msg.addrs()) { - node->addrs_.insert(addr.mq_id()); + auto UpdateRegInfo = [&](Node &node) { + node->addrs_.insert(SrcAddr(head)); + for (auto &addr : msg.addrs()) { + node->addrs_.insert(addr.mq_id()); + } + node->proc_.Swap(msg.mutable_proc()); + node->state_.timestamp_ = head.timestamp(); + node->state_.UpdateState(NowSec(), offline_time_, kill_time_); + }; + + auto pos = nodes_.find(head.proc_id()); + if (pos == nodes_.end()) { // new client + Node node(new NodeInfo); + UpdateRegInfo(node); + nodes_[node->proc_.proc_id()] = node; + } else { + Node &node = pos->second; + if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) { + // node restarted, release old mq. + for (auto &addr : node->addrs_) { + cleaner_(addr); + } + node->addrs_.clear(); + } + UpdateRegInfo(node); } - node->proc_.Swap(msg.mutable_proc()); - node->state_.timestamp_ = head.timestamp(); - node->state_.UpdateState(NowSec(), offline_time_, kill_time_); - nodes_[node->proc_.proc_id()] = node; return MakeReply(eSuccess); } catch (...) { return MakeReply(eError, "register node error."); @@ -134,7 +150,7 @@ if (pos == nodes_.end()) { return MakeReply<Reply>(eNotRegistered, "Node is not registered."); } else { - auto node = pos->second; + auto &node = pos->second; if (!MatchAddr(node->addrs_, SrcAddr(head))) { return MakeReply<Reply>(eAddressNotMatch, "Node address error."); } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) { @@ -342,8 +358,7 @@ auto node = weak.lock(); return node && Valid(*node); } - void CheckAllNodes(); //TODO, call it in timer. - std::string id_; // center proc id; + std::string id_; // center proc id; std::unordered_map<Topic, Clients> service_map_; std::unordered_map<Topic, Clients> subscribe_map_; @@ -385,30 +400,25 @@ bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) { auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 5s, 10s); - auto center_failed_q = std::make_shared<FailedMsgQ>(); - auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) { + auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); MsgI msg; if (msg.Make(socket.shm(), reply_head, rep_body)) { auto &remote = head.route(0).mq_id(); - bool r = socket.Send(remote.data(), msg, timeout_ms); - if (!r) { - failq.Push(remote, msg, 60s); // for later retry. - } + bool r = socket.Send(remote.data(), msg); } }; }; - auto OnCenterIdle = [center_ptr, center_failed_q](ShmSocket &socket) { + auto OnCenterIdle = [center_ptr](ShmSocket &socket) { auto ¢er = *center_ptr; - center_failed_q->TrySend(socket); center->OnTimer(); }; auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id(), *center_failed_q); + auto replyer = MakeReplyer(socket, head, center->id()); switch (head.type()) { CASE_ON_MSG_TYPE(Register); CASE_ON_MSG_TYPE(Heartbeat); @@ -419,11 +429,10 @@ } }; - auto bus_failed_q = std::make_shared<FailedMsgQ>(); - auto OnBusIdle = [=](ShmSocket &socket) { bus_failed_q->TrySend(socket); }; + auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id(), *bus_failed_q); + auto replyer = MakeReplyer(socket, head, center->id()); auto OnPublish = [&]() { MsgPublish pub; NodeCenter::Clients clients; @@ -442,9 +451,9 @@ auto &cli = *it; auto node = cli.weak_node_.lock(); if (node) { - if (!socket.Send(cli.mq_.data(), msg, 0)) { - bus_failed_q->Push(cli.mq_, msg, 60s); - } + // should also make sure that mq is not killed before msg expires. + // it would be ok if (kill_time - offline_time) is longer than expire time. + socket.Send(cli.mq_.data(), msg); ++it; } else { it = clients.erase(it); -- Gitblit v1.8.0