From 1b52f1cb8c47dd2c0195d2fd65d7b6a4c2f10704 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 12 四月 2021 18:29:41 +0800 Subject: [PATCH] add fail-resend support. --- src/center.cpp | 47 +++++++++++++++++++++++++++++------------------ 1 files changed, 29 insertions(+), 18 deletions(-) diff --git a/src/center.cpp b/src/center.cpp index d2aad0a..7865e57 100644 --- a/src/center.cpp +++ b/src/center.cpp @@ -18,6 +18,7 @@ #include "center.h" #include "bh_util.h" #include "defs.h" +#include "failed_msg.h" #include "shm.h" #include <chrono> #include <set> @@ -364,28 +365,31 @@ bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) { - auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner); - - auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { + auto center_failed_q = std::make_shared<FailedMsgQ>(); + auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); - bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100); - if (!r) { - printf("send reply failed.\n"); + MsgI msg; + if (msg.Make(socket.shm(), reply_head, rep_body)) { + auto &remote = head.route(0).mq_id(); + bool r = socket.Send(remote.data(), msg, timeout_ms); + if (!r) { + failq.Push(remote, msg, 60s); // for later retry. + } } - //TODO resend failed. }; }; - auto OnCenterIdle = [center_ptr](ShmSocket &socket) { + auto OnCenterIdle = [center_ptr, center_failed_q](ShmSocket &socket) { auto ¢er = *center_ptr; + center_failed_q->TrySend(socket); center->OnTimer(); }; auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id()); + auto replyer = MakeReplyer(socket, head, center->id(), *center_failed_q); switch (head.type()) { CASE_ON_MSG_TYPE(Register); CASE_ON_MSG_TYPE(Heartbeat); @@ -396,10 +400,11 @@ } }; - auto OnBusIdle = [](ShmSocket &socket) {}; + auto bus_failed_q = std::make_shared<FailedMsgQ>(); + auto OnBusIdle = [=](ShmSocket &socket) { bus_failed_q->TrySend(socket); }; auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id()); + auto replyer = MakeReplyer(socket, head, center->id(), *bus_failed_q); auto OnPublish = [&]() { MsgPublish pub; NodeCenter::Clients clients; @@ -407,19 +412,25 @@ if (head.route_size() != 1 || !msg.ParseBody(pub)) { return; } else if (!center->FindClients(head, pub, clients, reply)) { - MakeReplyer(socket, head, center->id())(reply); + replyer(reply); } else { - MakeReplyer(socket, head, center->id())(MakeReply(eSuccess)); + replyer(MakeReply(eSuccess)); if (!msg.EnableRefCount(socket.shm())) { return; } // no memory? + if (clients.empty()) { return; } - for (auto &cli : clients) { + auto it = clients.begin(); + do { + auto &cli = *it; auto node = cli.weak_node_.lock(); if (node) { - if (!socket.Send(cli.mq_.data(), msg, 100)) { - printf("center route publish failed. need resend.\n"); + if (!socket.Send(cli.mq_.data(), msg, 0)) { + bus_failed_q->Push(cli.mq_, msg, 60s); } + ++it; + } else { + it = clients.erase(it); } - } + } while (it != clients.end()); } }; switch (head.type()) { @@ -484,7 +495,7 @@ { for (auto &kv : Centers()) { auto &info = kv.second; - sockets_[info.name_]->Start(info.handler_); + sockets_[info.name_]->Start(info.handler_, info.idle_); } return true; -- Gitblit v1.8.0