From 1b52f1cb8c47dd2c0195d2fd65d7b6a4c2f10704 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 12 四月 2021 18:29:41 +0800
Subject: [PATCH] add fail-resend support.

---
 src/center.cpp |   47 +++++++++++++++++++++++++++++------------------
 1 files changed, 29 insertions(+), 18 deletions(-)

diff --git a/src/center.cpp b/src/center.cpp
index d2aad0a..7865e57 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -18,6 +18,7 @@
 #include "center.h"
 #include "bh_util.h"
 #include "defs.h"
+#include "failed_msg.h"
 #include "shm.h"
 #include <chrono>
 #include <set>
@@ -364,28 +365,31 @@
 
 bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
 {
-
 	auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner);
-
-	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
+	auto center_failed_q = std::make_shared<FailedMsgQ>();
+	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) {
 		return [&](auto &&rep_body) {
 			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
-			bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100);
-			if (!r) {
-				printf("send reply failed.\n");
+			MsgI msg;
+			if (msg.Make(socket.shm(), reply_head, rep_body)) {
+				auto &remote = head.route(0).mq_id();
+				bool r = socket.Send(remote.data(), msg, timeout_ms);
+				if (!r) {
+					failq.Push(remote, msg, 60s); // for later retry.
+				}
 			}
-			//TODO resend failed.
 		};
 	};
 
-	auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
+	auto OnCenterIdle = [center_ptr, center_failed_q](ShmSocket &socket) {
 		auto &center = *center_ptr;
+		center_failed_q->TrySend(socket);
 		center->OnTimer();
 	};
 
 	auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
 		auto &center = *center_ptr;
-		auto replyer = MakeReplyer(socket, head, center->id());
+		auto replyer = MakeReplyer(socket, head, center->id(), *center_failed_q);
 		switch (head.type()) {
 			CASE_ON_MSG_TYPE(Register);
 			CASE_ON_MSG_TYPE(Heartbeat);
@@ -396,10 +400,11 @@
 		}
 	};
 
-	auto OnBusIdle = [](ShmSocket &socket) {};
+	auto bus_failed_q = std::make_shared<FailedMsgQ>();
+	auto OnBusIdle = [=](ShmSocket &socket) { bus_failed_q->TrySend(socket); };
 	auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
 		auto &center = *center_ptr;
-		auto replyer = MakeReplyer(socket, head, center->id());
+		auto replyer = MakeReplyer(socket, head, center->id(), *bus_failed_q);
 		auto OnPublish = [&]() {
 			MsgPublish pub;
 			NodeCenter::Clients clients;
@@ -407,19 +412,25 @@
 			if (head.route_size() != 1 || !msg.ParseBody(pub)) {
 				return;
 			} else if (!center->FindClients(head, pub, clients, reply)) {
-				MakeReplyer(socket, head, center->id())(reply);
+				replyer(reply);
 			} else {
-				MakeReplyer(socket, head, center->id())(MakeReply(eSuccess));
+				replyer(MakeReply(eSuccess));
 				if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
+				if (clients.empty()) { return; }
 
-				for (auto &cli : clients) {
+				auto it = clients.begin();
+				do {
+					auto &cli = *it;
 					auto node = cli.weak_node_.lock();
 					if (node) {
-						if (!socket.Send(cli.mq_.data(), msg, 100)) {
-							printf("center route publish failed. need resend.\n");
+						if (!socket.Send(cli.mq_.data(), msg, 0)) {
+							bus_failed_q->Push(cli.mq_, msg, 60s);
 						}
+						++it;
+					} else {
+						it = clients.erase(it);
 					}
-				}
+				} while (it != clients.end());
 			}
 		};
 		switch (head.type()) {
@@ -484,7 +495,7 @@
 {
 	for (auto &kv : Centers()) {
 		auto &info = kv.second;
-		sockets_[info.name_]->Start(info.handler_);
+		sockets_[info.name_]->Start(info.handler_, info.idle_);
 	}
 
 	return true;

--
Gitblit v1.8.0