From e54b8e58780c7d9f37b06cc4e1dc88badb2129c9 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 18 五月 2021 17:02:21 +0800
Subject: [PATCH] remove sync recv, node cache msgs for sync recv.

---
 utest/speed_test.cpp |   37 ++++++++++++-------------------------
 1 files changed, 12 insertions(+), 25 deletions(-)

diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 4dea623..ef56678 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -189,36 +189,24 @@
 			Req();
 		}
 	};
+	auto onRequest = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
+		if (head.type() == kMsgTypeRequestTopic) {
+			MQInfo src_mq = {head.route()[0].mq_id(), head.route()[0].abs_addr()};
 
-	std::atomic<bool> stop(false);
-	auto Server = [&]() {
-		MsgI req;
-		BHMsgHead req_head;
-
-		while (!stop) {
-			if (srv.SyncRecv(req, req_head, 10)) {
-				DEFER1(req.Release());
-
-				if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
-					MQInfo src_mq = {req_head.route()[0].mq_id(), req_head.route()[0].abs_addr()};
-					auto Reply = [&]() {
-						MsgRequestTopic reply_body;
-						reply_body.set_topic("topic");
-						reply_body.set_data(msg_content);
-						auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
-						return srv.Send(src_mq, reply_head, reply_body);
-					};
-					Reply();
-				}
-			}
+			MsgRequestTopic reply_body;
+			reply_body.set_topic("topic");
+			reply_body.set_data(msg_content);
+			auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), head.msg_id()));
+			srv.Send(src_mq, reply_head, reply_body);
 		}
 	};
+	srv.Start(onRequest);
 
 	boost::timer::auto_cpu_timer timer;
 	DEFER1(printf("Request Reply Test:"););
 
-	ThreadManager clients, servers;
-	for (int i = 0; i < 2; ++i) { servers.Launch(Server); }
+	ThreadManager clients;
+
 	printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
 	for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
 	clients.WaitAll();
@@ -227,7 +215,6 @@
 		std::this_thread::sleep_for(100ms);
 	} while (count.load() < ncli * nmsg);
 	PrintStatus(NowSec());
-	stop = true;
-	servers.WaitAll();
+	srv.Stop();
 	// BOOST_CHECK_THROW(reply.Count(), int);
 }

--
Gitblit v1.8.0