From e54b8e58780c7d9f37b06cc4e1dc88badb2129c9 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 18 五月 2021 17:02:21 +0800
Subject: [PATCH] remove sync recv, node cache msgs for sync recv.

---
 src/topic_node.cpp |  112 +++++++++++++++++++++++++++++++++++--------------------
 1 files changed, 71 insertions(+), 41 deletions(-)

diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 43d748f..6be65be 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -107,6 +107,14 @@
 					}
 					SetProcIndex(reply.proc_index());
 					this->state_ = eStateUnregistered;
+					auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
+						server_buffer_->Write(std::move(head), msg.body());
+					};
+					SockServer().Start(onRequest);
+					auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
+						sub_buffer_->Write(std::move(head), msg.body());
+					};
+					SockSub().Start(onSub);
 				}
 			} break;
 			default: break;
@@ -341,26 +349,32 @@
 
 bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker)
 {
-	auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
-		if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
-		MsgRequestTopic req;
-		if (!imsg.ParseBody(req)) { return; }
+	if (acb) {
+		auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+			if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
+			MsgRequestTopic req;
+			if (!imsg.ParseBody(req)) { return; }
 
-		try {
-			SrcInfo *p = new SrcInfo;
-			if (!p) {
-				throw std::runtime_error("no memory.");
+			try {
+				SrcInfo *p = new SrcInfo;
+				if (!p) {
+					throw std::runtime_error("no memory.");
+				}
+				p->route.assign(head.route().begin(), head.route().end());
+				p->msg_id = head.msg_id();
+				acb(p, *head.mutable_proc_id(), req);
+			} catch (std::exception &e) {
+				LOG_ERROR() << "error server handle msg:" << e.what();
 			}
-			p->route.assign(head.route().begin(), head.route().end());
-			p->msg_id = head.msg_id();
-			acb(p, *head.mutable_proc_id(), req);
-		} catch (std::exception &e) {
-			LOG_ERROR() << "error server handle msg:" << e.what();
-		}
-	};
+		};
 
-	auto &sock = SockServer();
-	return acb && sock.Start(onRecv, nworker);
+		return SockServer().Start(onRecv, nworker);
+	} else {
+		auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
+			server_buffer_->Write(std::move(head), msg.body());
+		};
+		return SockServer().Start(onRequest, nworker);
+	}
 }
 
 bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
@@ -369,13 +383,19 @@
 		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
-
-	auto &sock = SockServer();
-
-	MsgI imsg;
 	BHMsgHead head;
-	if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
-		if (imsg.ParseBody(request)) {
+	std::string body;
+	auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+	while (!server_buffer_->Read(head, body)) {
+		if (steady_clock::now() < end_time) {
+			robust::QuickSleep();
+		} else {
+			return false;
+		}
+	}
+
+	if (head.type() == kMsgTypeRequestTopic) {
+		if (request.ParseFromString(body)) {
 			head.mutable_proc_id()->swap(proc_id);
 			try {
 				SrcInfo *p = new SrcInfo;
@@ -614,20 +634,24 @@
 
 bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker)
 {
-	auto &sock = SockSub();
-
-	auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
-		if (head.type() == kMsgTypePublish) {
-			MsgPublish pub;
-			if (imsg.ParseBody(pub)) {
-				tdcb(head.proc_id(), pub);
+	if (tdcb) {
+		auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
+			if (head.type() == kMsgTypePublish) {
+				MsgPublish pub;
+				if (imsg.ParseBody(pub)) {
+					tdcb(head.proc_id(), pub);
+				}
+			} else {
+				// ignored, or dropped
 			}
-		} else {
-			// ignored, or dropped
-		}
-	};
-
-	return tdcb && sock.Start(AsyncRecvProc, nworker);
+		};
+		return SockSub().Start(AsyncRecvProc, nworker);
+	} else {
+		auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
+			sub_buffer_->Write(std::move(head), msg.body());
+		};
+		return SockSub().Start(onSub, nworker);
+	}
 }
 
 bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
@@ -637,13 +661,19 @@
 		return false;
 	}
 
-	auto &sock = SockSub();
-	MsgI msg;
-	DEFER1(msg.Release(););
 	BHMsgHead head;
+	std::string body;
+	auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+	while (!sub_buffer_->Read(head, body)) {
+		if (steady_clock::now() < end_time) {
+			robust::QuickSleep();
+		} else {
+			return false;
+		}
+	}
 	//TODO error msg.
-	if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
-		if (msg.ParseBody(pub)) {
+	if (head.type() == kMsgTypePublish) {
+		if (pub.ParseFromString(body)) {
 			head.mutable_proc_id()->swap(proc_id);
 			return true;
 		}

--
Gitblit v1.8.0