From 1b52f1cb8c47dd2c0195d2fd65d7b6a4c2f10704 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 12 四月 2021 18:29:41 +0800
Subject: [PATCH] add fail-resend support.

---
 utest/speed_test.cpp |   12 +-
 src/failed_msg.h     |   47 +++++++
 src/socket.h         |    9 +
 .vscode/launch.json  |    2 
 src/socket.cpp       |    8 
 utest/utest.cpp      |   26 ++++
 src/failed_msg.cpp   |   33 +++++
 src/timed_queue.h    |   75 ++++++++++++
 src/topic_node.cpp   |   48 +------
 src/center.cpp       |   47 ++++---
 10 files changed, 235 insertions(+), 72 deletions(-)

diff --git a/.vscode/launch.json b/.vscode/launch.json
index ef42f7b..12aa21d 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -11,7 +11,7 @@
             "program": "${workspaceFolder}/debug/bin/utest",
             "args": [
                 "-t",
-                "ReqRepTest"
+                "SRTest"
             ],
             "stopAtEntry": false,
             "cwd": "${workspaceFolder}",
diff --git a/src/center.cpp b/src/center.cpp
index d2aad0a..7865e57 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -18,6 +18,7 @@
 #include "center.h"
 #include "bh_util.h"
 #include "defs.h"
+#include "failed_msg.h"
 #include "shm.h"
 #include <chrono>
 #include <set>
@@ -364,28 +365,31 @@
 
 bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
 {
-
 	auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner);
-
-	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
+	auto center_failed_q = std::make_shared<FailedMsgQ>();
+	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) {
 		return [&](auto &&rep_body) {
 			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
-			bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100);
-			if (!r) {
-				printf("send reply failed.\n");
+			MsgI msg;
+			if (msg.Make(socket.shm(), reply_head, rep_body)) {
+				auto &remote = head.route(0).mq_id();
+				bool r = socket.Send(remote.data(), msg, timeout_ms);
+				if (!r) {
+					failq.Push(remote, msg, 60s); // for later retry.
+				}
 			}
-			//TODO resend failed.
 		};
 	};
 
-	auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
+	auto OnCenterIdle = [center_ptr, center_failed_q](ShmSocket &socket) {
 		auto &center = *center_ptr;
+		center_failed_q->TrySend(socket);
 		center->OnTimer();
 	};
 
 	auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
 		auto &center = *center_ptr;
-		auto replyer = MakeReplyer(socket, head, center->id());
+		auto replyer = MakeReplyer(socket, head, center->id(), *center_failed_q);
 		switch (head.type()) {
 			CASE_ON_MSG_TYPE(Register);
 			CASE_ON_MSG_TYPE(Heartbeat);
@@ -396,10 +400,11 @@
 		}
 	};
 
-	auto OnBusIdle = [](ShmSocket &socket) {};
+	auto bus_failed_q = std::make_shared<FailedMsgQ>();
+	auto OnBusIdle = [=](ShmSocket &socket) { bus_failed_q->TrySend(socket); };
 	auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
 		auto &center = *center_ptr;
-		auto replyer = MakeReplyer(socket, head, center->id());
+		auto replyer = MakeReplyer(socket, head, center->id(), *bus_failed_q);
 		auto OnPublish = [&]() {
 			MsgPublish pub;
 			NodeCenter::Clients clients;
@@ -407,19 +412,25 @@
 			if (head.route_size() != 1 || !msg.ParseBody(pub)) {
 				return;
 			} else if (!center->FindClients(head, pub, clients, reply)) {
-				MakeReplyer(socket, head, center->id())(reply);
+				replyer(reply);
 			} else {
-				MakeReplyer(socket, head, center->id())(MakeReply(eSuccess));
+				replyer(MakeReply(eSuccess));
 				if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
+				if (clients.empty()) { return; }
 
-				for (auto &cli : clients) {
+				auto it = clients.begin();
+				do {
+					auto &cli = *it;
 					auto node = cli.weak_node_.lock();
 					if (node) {
-						if (!socket.Send(cli.mq_.data(), msg, 100)) {
-							printf("center route publish failed. need resend.\n");
+						if (!socket.Send(cli.mq_.data(), msg, 0)) {
+							bus_failed_q->Push(cli.mq_, msg, 60s);
 						}
+						++it;
+					} else {
+						it = clients.erase(it);
 					}
-				}
+				} while (it != clients.end());
 			}
 		};
 		switch (head.type()) {
@@ -484,7 +495,7 @@
 {
 	for (auto &kv : Centers()) {
 		auto &info = kv.second;
-		sockets_[info.name_]->Start(info.handler_);
+		sockets_[info.name_]->Start(info.handler_, info.idle_);
 	}
 
 	return true;
diff --git a/src/failed_msg.cpp b/src/failed_msg.cpp
new file mode 100644
index 0000000..ab4658d
--- /dev/null
+++ b/src/failed_msg.cpp
@@ -0,0 +1,33 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  failed_msg.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�12鏃� 16鏃�10鍒�53绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "failed_msg.h"
+
+FailedMsgQ::Func FailedMsgQ::PrepareSender(const std::string &remote, Msg const &msg)
+{
+	msg.AddRef();
+	return [remote, msg](void *valid_sock) {
+		assert(valid_sock);
+		ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock);
+		bool r = sock.Send(remote.data(), msg, 0);
+		if (r && msg.IsCounted()) {
+			auto tmp = msg; // Release() is not const, but it's safe to release.
+			tmp.Release(sock.shm());
+		}
+		return r;
+	};
+}
\ No newline at end of file
diff --git a/src/failed_msg.h b/src/failed_msg.h
new file mode 100644
index 0000000..2d57abc
--- /dev/null
+++ b/src/failed_msg.h
@@ -0,0 +1,47 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  failed_msg.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�12鏃� 11鏃�21鍒�30绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef FAILED_MSG_9YOI86AS
+#define FAILED_MSG_9YOI86AS
+
+#include "msg.h"
+#include "socket.h"
+#include "timed_queue.h"
+#include <string>
+
+class FailedMsgQ
+{
+	typedef std::function<bool(void *)> Func;
+	typedef TimedQueue<Func> TimedFuncQ;
+
+public:
+	typedef bhome_msg::MsgI Msg;
+
+	void Push(const std::string &remote, Msg const &msg, TimedFuncQ::TimePoint const &exr) { queue_.Push(PrepareSender(remote, msg), exr); }
+	void Push(const std::string &remote, Msg const &msg, TimedFuncQ::Duration const &exr) { queue_.Push(PrepareSender(remote, msg), exr); }
+	void TrySend(ShmSocket &socket)
+	{
+		queue_.CheckAll([&](Func &f) { return f(&socket); });
+	}
+
+private:
+	Func PrepareSender(const std::string &remote, Msg const &msg);
+
+	TimedFuncQ queue_;
+};
+
+#endif // end of include guard: FAILED_MSG_9YOI86AS
diff --git a/src/socket.cpp b/src/socket.cpp
index 116175d..2c55665 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -45,11 +45,12 @@
 {
 	auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
 		RecvCB cb;
-		if (async_cbs_->Find(head.msg_id(), cb)) {
+		if (per_msg_cbs_->Find(head.msg_id(), cb)) {
 			cb(socket, imsg, head);
 		} else if (onData) {
 			onData(socket, imsg, head);
-		} // else ignored, or dropped
+		} else { // else ignored, or dropped
+		}
 	};
 
 	auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() {
@@ -61,7 +62,8 @@
 				if (imsg.ParseHead(head)) {
 					onRecvWithPerMsgCB(*this, imsg, head);
 				}
-			} else if (onIdle) {
+			}
+			if (onIdle) {
 				onIdle(*this);
 			}
 		} catch (...) {
diff --git a/src/socket.h b/src/socket.h
index ee25d81..5973ab6 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -77,7 +77,7 @@
 	template <class Body>
 	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb)
 	{
-		auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); };
+		auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { per_msg_cbs_->Add(head.msg_id(), cb); }); };
 		MsgI msg;
 		return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
 	}
@@ -109,12 +109,15 @@
 					reply.swap(msg);
 					reply_head.Swap(&head);
 					st->cv.notify_one();
-				} else {
+				} else { // ignore
 				}
 			};
 
 			std::unique_lock<std::mutex> lk(st->mutex);
 			bool sendok = Send(remote, head, body, timeout_ms, OnRecv);
+			if (!sendok) {
+				printf("send timeout\n");
+			}
 			if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
 				return true;
 			} else {
@@ -161,7 +164,7 @@
 		}
 	};
 
-	Synced<AsyncCBs> async_cbs_;
+	Synced<AsyncCBs> per_msg_cbs_;
 };
 
 #endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/src/timed_queue.h b/src/timed_queue.h
new file mode 100644
index 0000000..14e318d
--- /dev/null
+++ b/src/timed_queue.h
@@ -0,0 +1,75 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  failed_msg.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�12鏃� 09鏃�36鍒�04绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef TIMED_QUEUE_Y2YLRBS3
+#define TIMED_QUEUE_Y2YLRBS3
+
+#include "bh_util.h"
+#include <chrono>
+#include <list>
+#include <string>
+
+template <class Data, class ClockType = std::chrono::steady_clock>
+class TimedQueue
+{
+public:
+	typedef ClockType Clock;
+	typedef typename Clock::time_point TimePoint;
+	typedef typename Clock::duration Duration;
+
+private:
+	struct Record {
+		TimePoint expire_;
+		Data data_;
+		Record(const TimePoint &expire, const Data &data) :
+		    expire_(expire), data_(data) {}
+		Record(const TimePoint &expire, Data &&data) :
+		    expire_(expire), data_(std::move(data)) {}
+		bool Expired() { return Clock::now() > expire_; }
+	};
+	typedef std::list<Record> Queue;
+	Synced<Queue> queue_;
+
+public:
+	void Push(Data &&data, const TimePoint &expire) { queue_->emplace_back(expire, std::move(data)); }
+	void Push(Data const &data, const TimePoint &expire) { queue_->emplace_back(expire, data); }
+
+	void Push(Data &&data, Duration const &timeout) { Push(std::move(data), Clock::now() + timeout); }
+	void Push(Data const &data, Duration const &timeout) { Push(data, Clock::now() + timeout); }
+
+	template <class Func>
+	void CheckAll(Func const &func)
+	{
+		queue_.Apply([&](Queue &q) {
+			if (q.empty()) {
+				return;
+			}
+			auto it = q.begin();
+			do {
+				if (it->Expired()) {
+					it = q.erase(it);
+				} else if (func(it->data_)) {
+					it = q.erase(it);
+				} else {
+					++it;
+				}
+			} while (it != q.end());
+		});
+	}
+};
+
+#endif // end of include guard: TIMED_QUEUE_Y2YLRBS3
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 5afec3f..8cd5cc4 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -17,6 +17,7 @@
  */
 #include "topic_node.h"
 #include "bh_util.h"
+#include "failed_msg.h"
 #include <chrono>
 #include <list>
 
@@ -32,44 +33,7 @@
 	std::string msg_id;
 };
 
-class ServerFailedQ
-{
-	struct FailedMsg {
-		steady_clock::time_point xpr;
-		std::string remote_;
-		BHMsgHead head_;
-		MsgRequestTopicReply body_;
-		FailedMsg(const std::string &addr, BHMsgHead &&head, MsgRequestTopicReply &&body) :
-		    xpr(steady_clock::now() + 10s), remote_(addr), head_(std::move(head)), body_(std::move(body)) {}
-		bool Expired() { return steady_clock::now() > xpr; }
-	};
-	typedef std::list<FailedMsg> Queue;
-	Synced<Queue> queue_;
-
-public:
-	void Push(const std::string &remote, BHMsgHead &&head, MsgRequestTopicReply &&body)
-	{
-		queue_->emplace_back(remote, std::move(head), std::move(body));
-	}
-	void TrySend(ShmSocket &socket, const int timeout_ms = 0)
-	{
-		queue_.Apply([&](Queue &q) {
-			if (!q.empty()) {
-				auto it = q.begin();
-				do {
-					if (it->Expired()) {
-						// it->msg_.Release(socket.shm());
-						it = q.erase(it);
-					} else if (socket.Send(it->remote_.data(), it->head_, it->body_, timeout_ms)) {
-						it = q.erase(it);
-					} else {
-						++it;
-					}
-				} while (it != q.end());
-			}
-		});
-	}
-};
+typedef FailedMsgQ ServerFailedQ;
 
 } // namespace
 TopicNode::TopicNode(SharedMemory &shm) :
@@ -158,8 +122,12 @@
 					for (int i = 0; i < head.route_size() - 1; ++i) {
 						reply_head.add_route()->Swap(head.mutable_route(i));
 					}
-					if (!sock.Send(head.route().rbegin()->mq_id().data(), reply_head, reply_body, 10)) {
-						failed_q->Push(head.route().rbegin()->mq_id(), std::move(reply_head), std::move(reply_body));
+					MsgI msg;
+					if (msg.Make(sock.shm(), reply_head, reply_body)) {
+						auto &remote = head.route().rbegin()->mq_id();
+						if (!sock.Send(remote.data(), msg, 10)) {
+							failed_q->Push(remote, msg, 10s);
+						}
 					}
 				}
 			}
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index b1f11ac..77c018a 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -26,7 +26,7 @@
 	ShmRemover auto_remove(shm_name);
 	const int mem_size = 1024 * 1024 * 50;
 	MQId id = boost::uuids::random_generator()();
-	const int timeout = 100;
+	const int timeout = 1000;
 	const uint32_t data_size = 4000;
 	const std::string proc_id = "demo_proc";
 
@@ -44,7 +44,6 @@
 		DEFER1(msg.Release(shm););
 
 		for (uint64_t i = 0; i < n; ++i) {
-			// mq.Send(id, str.data(), str.size(), timeout);
 			mq.Send(id, msg, timeout);
 		}
 	};
@@ -91,6 +90,7 @@
 					www.Launch(Writer, i, nmsg);
 				}
 				www.WaitAll();
+				printf("writer finished\n");
 				run.store(false);
 				rrr.WaitAll();
 				printf("Write %ld msg  R(%3d) W(%3d), : ", total_msg, nreader, nwriter);
@@ -136,14 +136,18 @@
 	req_body.set_topic("topic");
 	req_body.set_data(msg_content);
 	auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
+	req_head.add_route()->set_mq_id(&cli.id(), cli.id().size());
 	request_rc.MakeRC(shm, req_head, req_body);
+	DEFER1(request_rc.Release(shm));
 
 	MsgRequestTopic reply_body;
 	reply_body.set_topic("topic");
 	reply_body.set_data(msg_content);
 	auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id));
+	reply_head.add_route()->set_mq_id(&srv.id(), srv.id().size());
 	MsgI reply_rc;
 	reply_rc.MakeRC(shm, reply_head, reply_body);
+	DEFER1(reply_rc.Release(shm));
 
 	std::atomic<uint64_t> count(0);
 
@@ -224,9 +228,5 @@
 	printf("request ok: %ld\n", count.load());
 	stop = true;
 	servers.WaitAll();
-	BOOST_CHECK(request_rc.IsCounted());
-	BOOST_CHECK_EQUAL(request_rc.Count(), 1);
-	request_rc.Release(shm);
-	BOOST_CHECK(!request_rc.IsCounted());
 	// BOOST_CHECK_THROW(reply.Count(), int);
 }
diff --git a/utest/utest.cpp b/utest/utest.cpp
index a178fab..e0a9023 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,5 +1,6 @@
 #include "center.h"
 #include "defs.h"
+#include "failed_msg.h"
 #include "util.h"
 #include <atomic>
 #include <boost/uuid/uuid_generators.hpp>
@@ -21,8 +22,28 @@
 	static const bool value = true;
 };
 
+typedef FailedMsgQ ServerFailedQ;
+
 BOOST_AUTO_TEST_CASE(Temp)
 {
+	const std::string shm_name("ShmTemp");
+	ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
+	SharedMemory shm(shm_name, 1024 * 1024 * 10);
+
+	typedef std::chrono::steady_clock clock;
+	int n = 1000 * 1000;
+	std::vector<clock::time_point> tps(n);
+	{
+		printf("thread switch %d times, ", n);
+		boost::timer::auto_cpu_timer timer;
+		for (auto &tp : tps) {
+			tp = clock::now();
+			std::this_thread::yield();
+		}
+	}
+	printf("time: %ld ns\n", (tps.back() - tps.front()).count());
+	return;
+	// sub topic partial match.
 	Topic topics[] = {
 	    "",
 	    ".",
@@ -131,7 +152,9 @@
 
 			bool r = provider.Publish(topic, data.data(), data.size(), timeout);
 			if (!r) {
-				printf("pub ret: %s\n", r ? "ok" : "fail");
+				static std::atomic<int> an(0);
+				int n = ++an;
+				printf("pub %d ret: %s\n", n, r ? "ok" : "fail");
 			}
 		}
 	};
@@ -142,6 +165,7 @@
 		topics.push_back("t" + std::to_string(i));
 	}
 	Topics part;
+	boost::timer::auto_cpu_timer pubsub_timer;
 	for (size_t i = 0; i < topics.size(); ++i) {
 		part.push_back(topics[i]);
 		threads.Launch(Sub, i, topics);

--
Gitblit v1.8.0