From e54b8e58780c7d9f37b06cc4e1dc88badb2129c9 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 18 五月 2021 17:02:21 +0800
Subject: [PATCH] remove sync recv, node cache msgs for sync recv.

---
 utest/speed_test.cpp |   37 +++------
 src/socket.h         |    4 
 utest/api_test.cpp   |   30 ++++++-
 src/msg.h            |   11 ++
 src/socket.cpp       |   19 ----
 src/topic_node.h     |   26 ++++++
 src/topic_node.cpp   |  112 +++++++++++++++++----------
 7 files changed, 146 insertions(+), 93 deletions(-)

diff --git a/src/msg.h b/src/msg.h
index e8af3c5..42a753e 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -209,6 +209,17 @@
 		p += 4;
 		return head.ParseFromArray(p, msg_size);
 	}
+	std::string body() const
+	{
+		auto p = get<char>();
+		assert(p);
+		uint32_t size = Get32(p);
+		p += 4;
+		p += size;
+		size = Get32(p);
+		p += 4;
+		return std::string(p, size);
+	}
 	template <class Body>
 	bool ParseBody(Body &body) const
 	{
diff --git a/src/socket.cpp b/src/socket.cpp
index c450e65..19be201 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -139,25 +139,6 @@
 	return false;
 }
 
-bool ShmSocket::SyncRecv(int64_t &cmd, const int timeout_ms)
-{
-	return (timeout_ms == 0) ? mq().TryRecv(cmd) : mq().Recv(cmd, timeout_ms);
-}
-//maybe reimplment, using async cbs?
-bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms)
-{
-	// std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_.
-	bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms);
-	if (got) {
-		if (msg.ParseHead(head)) {
-			return true;
-		} else {
-			msg.Release();
-		}
-	}
-	return false;
-}
-
 bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb)
 {
 	size_t size = content.size();
diff --git a/src/socket.h b/src/socket.h
index dea106c..7557034 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -99,8 +99,6 @@
 	{
 		return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
 	}
-	bool SyncRecv(int64_t &cmd, const int timeout_ms);
-	bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
 
 	template <class Body>
 	bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
@@ -190,8 +188,8 @@
 
 	Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
 	Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
-
 	SendQ send_buffer_;
+
 	// node request center alloc memory.
 	int node_proc_index_ = -1;
 	int socket_index_ = -1;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 43d748f..6be65be 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -107,6 +107,14 @@
 					}
 					SetProcIndex(reply.proc_index());
 					this->state_ = eStateUnregistered;
+					auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
+						server_buffer_->Write(std::move(head), msg.body());
+					};
+					SockServer().Start(onRequest);
+					auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
+						sub_buffer_->Write(std::move(head), msg.body());
+					};
+					SockSub().Start(onSub);
 				}
 			} break;
 			default: break;
@@ -341,26 +349,32 @@
 
 bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker)
 {
-	auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
-		if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
-		MsgRequestTopic req;
-		if (!imsg.ParseBody(req)) { return; }
+	if (acb) {
+		auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+			if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
+			MsgRequestTopic req;
+			if (!imsg.ParseBody(req)) { return; }
 
-		try {
-			SrcInfo *p = new SrcInfo;
-			if (!p) {
-				throw std::runtime_error("no memory.");
+			try {
+				SrcInfo *p = new SrcInfo;
+				if (!p) {
+					throw std::runtime_error("no memory.");
+				}
+				p->route.assign(head.route().begin(), head.route().end());
+				p->msg_id = head.msg_id();
+				acb(p, *head.mutable_proc_id(), req);
+			} catch (std::exception &e) {
+				LOG_ERROR() << "error server handle msg:" << e.what();
 			}
-			p->route.assign(head.route().begin(), head.route().end());
-			p->msg_id = head.msg_id();
-			acb(p, *head.mutable_proc_id(), req);
-		} catch (std::exception &e) {
-			LOG_ERROR() << "error server handle msg:" << e.what();
-		}
-	};
+		};
 
-	auto &sock = SockServer();
-	return acb && sock.Start(onRecv, nworker);
+		return SockServer().Start(onRecv, nworker);
+	} else {
+		auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
+			server_buffer_->Write(std::move(head), msg.body());
+		};
+		return SockServer().Start(onRequest, nworker);
+	}
 }
 
 bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
@@ -369,13 +383,19 @@
 		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
-
-	auto &sock = SockServer();
-
-	MsgI imsg;
 	BHMsgHead head;
-	if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
-		if (imsg.ParseBody(request)) {
+	std::string body;
+	auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+	while (!server_buffer_->Read(head, body)) {
+		if (steady_clock::now() < end_time) {
+			robust::QuickSleep();
+		} else {
+			return false;
+		}
+	}
+
+	if (head.type() == kMsgTypeRequestTopic) {
+		if (request.ParseFromString(body)) {
 			head.mutable_proc_id()->swap(proc_id);
 			try {
 				SrcInfo *p = new SrcInfo;
@@ -614,20 +634,24 @@
 
 bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker)
 {
-	auto &sock = SockSub();
-
-	auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
-		if (head.type() == kMsgTypePublish) {
-			MsgPublish pub;
-			if (imsg.ParseBody(pub)) {
-				tdcb(head.proc_id(), pub);
+	if (tdcb) {
+		auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
+			if (head.type() == kMsgTypePublish) {
+				MsgPublish pub;
+				if (imsg.ParseBody(pub)) {
+					tdcb(head.proc_id(), pub);
+				}
+			} else {
+				// ignored, or dropped
 			}
-		} else {
-			// ignored, or dropped
-		}
-	};
-
-	return tdcb && sock.Start(AsyncRecvProc, nworker);
+		};
+		return SockSub().Start(AsyncRecvProc, nworker);
+	} else {
+		auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
+			sub_buffer_->Write(std::move(head), msg.body());
+		};
+		return SockSub().Start(onSub, nworker);
+	}
 }
 
 bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
@@ -637,13 +661,19 @@
 		return false;
 	}
 
-	auto &sock = SockSub();
-	MsgI msg;
-	DEFER1(msg.Release(););
 	BHMsgHead head;
+	std::string body;
+	auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+	while (!sub_buffer_->Read(head, body)) {
+		if (steady_clock::now() < end_time) {
+			robust::QuickSleep();
+		} else {
+			return false;
+		}
+	}
 	//TODO error msg.
-	if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
-		if (msg.ParseBody(pub)) {
+	if (head.type() == kMsgTypePublish) {
+		if (pub.ParseFromString(body)) {
 			head.mutable_proc_id()->swap(proc_id);
 			return true;
 		}
diff --git a/src/topic_node.h b/src/topic_node.h
index 1dfbf43..81bf718 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -163,6 +163,32 @@
 	int proc_index_ = -1;
 
 	TopicQueryCache topic_query_cache_;
+
+	class RecvQ
+	{
+	public:
+		void Write(BHMsgHead &&head, std::string &&body) { q_.push_back({std::move(head), std::move(body)}); }
+		bool Read(BHMsgHead &head, std::string &body)
+		{
+			if (q_.empty()) {
+				return false;
+			} else {
+				head = std::move(q_.front().head);
+				body = std::move(q_.front().body);
+				q_.pop_front();
+				return true;
+			}
+		}
+
+	private:
+		struct MsgData {
+			BHMsgHead head;
+			std::string body;
+		};
+		std::deque<MsgData> q_;
+	};
+	Synced<RecvQ> server_buffer_;
+	Synced<RecvQ> sub_buffer_;
 };
 
 #endif // end of include guard: TOPIC_NODE_YVKWA6TF
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 44c809d..533c399 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -131,6 +131,15 @@
 		reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
 		if (reg) {
 			printf("register ok\n");
+			// bool r = BHUnregister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
+			// printf("unregister %s\n", r ? "ok" : "failed");
+			// reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
+			// if (!reg) {
+			// 	int ec = 0;
+			// 	std::string msg;
+			// 	GetLastError(ec, msg);
+			// 	printf("reg error: %s\n", msg.c_str());
+			// }
 		} else {
 			int ec = 0;
 			std::string msg;
@@ -201,7 +210,7 @@
 
 	auto SyncRequest = [&](int idx) { // SyncRequest
 		MsgRequestTopic req;
-		req.set_topic(topic_ + std::to_string(idx));
+		req.set_topic(topic_ + std::to_string(0));
 		req.set_data("request_data_" + std::to_string(idx));
 		std::string s(req.SerializeAsString());
 		// Sleep(10ms, false);
@@ -286,19 +295,30 @@
 
 	std::atomic<bool> run(true);
 
-	BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
 	ThreadManager threads;
+
+#if 0
+	BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
+#else
+	BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc);
+	threads.Launch(ServerLoop, &run);
+#endif
+
 	boost::timer::auto_cpu_timer timer;
 	threads.Launch(hb, &run);
 	threads.Launch(showStatus, &run);
 	int ncli = 10;
 	const int64_t nreq = 1000 * 100;
+
+#if 1
 	for (int i = 0; i < ncli; ++i) {
 		threads.Launch(asyncRequest, nreq);
 	}
-	// for (int i = 0; i < 100; ++i) {
-	// 	SyncRequest(0);
-	// }
+#else
+	for (int i = 0; i < 100; ++i) {
+		SyncRequest(i);
+	}
+#endif
 
 	int same = 0;
 	uint64_t last = 0;
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 4dea623..ef56678 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -189,36 +189,24 @@
 			Req();
 		}
 	};
+	auto onRequest = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
+		if (head.type() == kMsgTypeRequestTopic) {
+			MQInfo src_mq = {head.route()[0].mq_id(), head.route()[0].abs_addr()};
 
-	std::atomic<bool> stop(false);
-	auto Server = [&]() {
-		MsgI req;
-		BHMsgHead req_head;
-
-		while (!stop) {
-			if (srv.SyncRecv(req, req_head, 10)) {
-				DEFER1(req.Release());
-
-				if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
-					MQInfo src_mq = {req_head.route()[0].mq_id(), req_head.route()[0].abs_addr()};
-					auto Reply = [&]() {
-						MsgRequestTopic reply_body;
-						reply_body.set_topic("topic");
-						reply_body.set_data(msg_content);
-						auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
-						return srv.Send(src_mq, reply_head, reply_body);
-					};
-					Reply();
-				}
-			}
+			MsgRequestTopic reply_body;
+			reply_body.set_topic("topic");
+			reply_body.set_data(msg_content);
+			auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), head.msg_id()));
+			srv.Send(src_mq, reply_head, reply_body);
 		}
 	};
+	srv.Start(onRequest);
 
 	boost::timer::auto_cpu_timer timer;
 	DEFER1(printf("Request Reply Test:"););
 
-	ThreadManager clients, servers;
-	for (int i = 0; i < 2; ++i) { servers.Launch(Server); }
+	ThreadManager clients;
+
 	printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
 	for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
 	clients.WaitAll();
@@ -227,7 +215,6 @@
 		std::this_thread::sleep_for(100ms);
 	} while (count.load() < ncli * nmsg);
 	PrintStatus(NowSec());
-	stop = true;
-	servers.WaitAll();
+	srv.Stop();
 	// BOOST_CHECK_THROW(reply.Count(), int);
 }

--
Gitblit v1.8.0