From 9243710ca372de26823c2225c7b46b072458c671 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 28 五月 2021 17:18:33 +0800
Subject: [PATCH] tcp proxy requests, need more test.

---
 utest/speed_test.cpp |   49 ++++++++++++++++++-------------------------------
 1 files changed, 18 insertions(+), 31 deletions(-)

diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 4dea623..f33f0db 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -24,7 +24,7 @@
 {
 	SharedMemory &shm = TestShm();
 	GlobalInit(shm);
-	MQId server_id = ShmMsgQueue::NewId();
+	MQId server_id = NewSession();
 	ShmMsgQueue server(server_id, shm, 1000);
 
 	const int timeout = 1000;
@@ -35,10 +35,10 @@
 
 	std::string str(data_size, 'a');
 	auto Writer = [&](int writer_id, uint64_t n) {
-		MQId cli_id = ShmMsgQueue::NewId();
+		MQId cli_id = NewSession();
 
 		ShmMsgQueue mq(cli_id, shm, 64);
-		MsgI msg;
+		MsgI msg(shm);
 		MsgRequestTopic body;
 		body.set_topic("topic");
 		body.set_data(str);
@@ -58,7 +58,7 @@
 		auto now = []() { return steady_clock::now(); };
 		auto tm = now();
 		while (*run) {
-			MsgI msg;
+			MsgI msg(shm);
 			BHMsgHead head;
 			if (mq.TryRecv(msg)) {
 				DEFER1(msg.Release());
@@ -149,8 +149,8 @@
 
 	auto Avail = [&]() { return shm.get_free_memory(); };
 	auto init_avail = Avail();
-	ShmSocket srv(shm, ShmMsgQueue::NewId(), qlen);
-	ShmSocket cli(shm, ShmMsgQueue::NewId(), qlen);
+	ShmSocket srv(shm, NewSession(), qlen);
+	ShmSocket cli(shm, NewSession(), qlen);
 
 	int ncli = 1;
 	uint64_t nmsg = 1000 * 1000 * 1;
@@ -189,36 +189,24 @@
 			Req();
 		}
 	};
+	auto onRequest = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
+		if (head.type() == kMsgTypeRequestTopic) {
+			MQInfo src_mq = {head.route()[0].mq_id(), head.route()[0].abs_addr()};
 
-	std::atomic<bool> stop(false);
-	auto Server = [&]() {
-		MsgI req;
-		BHMsgHead req_head;
-
-		while (!stop) {
-			if (srv.SyncRecv(req, req_head, 10)) {
-				DEFER1(req.Release());
-
-				if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
-					MQInfo src_mq = {req_head.route()[0].mq_id(), req_head.route()[0].abs_addr()};
-					auto Reply = [&]() {
-						MsgRequestTopic reply_body;
-						reply_body.set_topic("topic");
-						reply_body.set_data(msg_content);
-						auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
-						return srv.Send(src_mq, reply_head, reply_body);
-					};
-					Reply();
-				}
-			}
+			MsgRequestTopic reply_body;
+			reply_body.set_topic("topic");
+			reply_body.set_data(msg_content);
+			auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), head.msg_id()));
+			srv.Send(src_mq, reply_head, reply_body);
 		}
 	};
+	srv.Start(onRequest);
 
 	boost::timer::auto_cpu_timer timer;
 	DEFER1(printf("Request Reply Test:"););
 
-	ThreadManager clients, servers;
-	for (int i = 0; i < 2; ++i) { servers.Launch(Server); }
+	ThreadManager clients;
+
 	printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
 	for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
 	clients.WaitAll();
@@ -227,7 +215,6 @@
 		std::this_thread::sleep_for(100ms);
 	} while (count.load() < ncli * nmsg);
 	PrintStatus(NowSec());
-	stop = true;
-	servers.WaitAll();
+	srv.Stop();
 	// BOOST_CHECK_THROW(reply.Count(), int);
 }

--
Gitblit v1.8.0