From 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 19 五月 2021 19:14:13 +0800
Subject: [PATCH] add api BHQueryProcs.

---
 utest/speed_test.cpp |  335 +++++++++++++++++++++++++++++--------------------------
 1 files changed, 179 insertions(+), 156 deletions(-)

diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index c7f8d4f..ef56678 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -15,183 +15,206 @@
  *
  * =====================================================================================
  */
+#include "robust.h"
 #include "util.h"
-#include <boost/date_time/posix_time/posix_time.hpp>
 
-using namespace boost::posix_time;
+using namespace robust;
 
 BOOST_AUTO_TEST_CASE(SpeedTest)
 {
-    const std::string shm_name("ShmSpeed");
-    ShmRemover auto_remove(shm_name);
-    const int mem_size = 1024*1024*50;
-    MQId id = boost::uuids::random_generator()();
-    const int timeout = 100;
-    const uint32_t data_size = 4000;
+	SharedMemory &shm = TestShm();
+	GlobalInit(shm);
+	MQId server_id = ShmMsgQueue::NewId();
+	ShmMsgQueue server(server_id, shm, 1000);
 
-    auto Writer = [&](int writer_id, uint64_t n) {
-        SharedMemory shm(shm_name, mem_size);
-        ShmMsgQueue mq(shm, 64);
-        std::string str(data_size, 'a');
-        MsgI msg;
-        DEFER1(msg.Release(shm););
-        msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size()));
-        for (uint64_t i = 0; i < n; ++i) {
-            // mq.Send(id, str.data(), str.size(), timeout);
-            mq.Send(id, msg, timeout);
-        }
-    };
-    auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork){
-        SharedMemory shm(shm_name, mem_size);
-        ShmMsgQueue mq(id, shm, 1000);
-        while(*run) {
-            BHMsg msg;
-            if (mq.Recv(msg, timeout)) {
-                // ok
-            } else if (isfork) {
-                exit(0); // for forked quit after 1s.
-            }
-        }
-    };
-    auto State = [&](std::atomic<bool> *run){
-        SharedMemory shm(shm_name, mem_size);
-        auto init = shm.get_free_memory();
-        printf("shm init : %ld\n", init);
-        while (*run) {
-            auto cur = shm.get_free_memory();
-            printf("shm used : %8ld/%ld\n", init - cur, init);
-            std::this_thread::sleep_for(1s);
-        }
-    };
+	const int timeout = 1000;
+	const uint32_t data_size = 1001;
+	const std::string proc_id = "demo_proc";
+	std::atomic<int64_t> nwrite(0);
+	std::atomic<int64_t> nread(0);
 
-    int nwriters[] = {1,2,4};
-    int nreaders[] = {1,2};
+	std::string str(data_size, 'a');
+	auto Writer = [&](int writer_id, uint64_t n) {
+		MQId cli_id = ShmMsgQueue::NewId();
 
-    auto Test = [&](auto &www, auto &rrr, bool isfork) {
-        for (auto nreader : nreaders) {
-            for (auto nwriter : nwriters) {
-                const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
-                const uint64_t total_msg = nmsg * nwriter;
-                std::atomic<bool> run(true);
-                std::this_thread::sleep_for(10ms);
-                boost::timer::auto_cpu_timer timer;
-                for (int i = 0; i < nreader; ++i) {
-                    rrr.Launch(Reader, i, &run, isfork);
-                }
-                for (int i = 0; i < nwriter; ++i) {
-                    www.Launch(Writer, i, nmsg);
-                }
-                www.WaitAll();
-                run.store(false);
-                rrr.WaitAll();
-                printf("%3d Write %ld msg  R(%3d) W(%3d), : ", getpid(), total_msg, nreader, nwriter);
-            }
-        }
-    };
+		ShmMsgQueue mq(cli_id, shm, 64);
+		MsgI msg;
+		MsgRequestTopic body;
+		body.set_topic("topic");
+		body.set_data(str);
+		auto head(InitMsgHead(GetType(body), proc_id, mq.Id()));
+		msg.Make(head, body);
+		assert(msg.valid());
+		DEFER1(msg.Release(););
 
-    std::atomic<bool> run(true);
-    ThreadManager state;
-    state.Launch(State, &run);
-    // typedef ProcessManager Manager;
-    // typedef ThreadManager Manager;
-    // const bool isfork = IsSameType<Manager, ProcessManager>::value;
-    ProcessManager pw, pr;
-    printf("================ Testing process io: =======================================================\n");
-    Test(pw, pr, true);
-    ThreadManager tw, tr;
-    printf("---------------- Testing thread io:  -------------------------------------------------------\n");
-    Test(tw, tr, false);
-    run.store(false);
+		for (uint64_t i = 0; i < n; ++i) {
+			msg.AddRef();
+			while (!mq.TrySend({server.Id(), server.AbsAddr()}, msg.Offset())) {}
+			++nwrite;
+		}
+	};
+	auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
+		ShmMsgQueue &mq = server;
+		auto now = []() { return steady_clock::now(); };
+		auto tm = now();
+		while (*run) {
+			MsgI msg;
+			BHMsgHead head;
+			if (mq.TryRecv(msg)) {
+				DEFER1(msg.Release());
+				tm = now();
+				++nread;
+			} else if (isfork) {
+				if (now() > tm + 1s) {
+					exit(0); // for forked quit after 1s.
+				}
+			}
+		}
+	};
+	auto State = [&](std::atomic<bool> *run) {
+		auto init = shm.get_free_memory();
+		printf("shm init : %ld\n", init);
+		uint64_t last_read = 0;
+		while (*run) {
+			auto cur = shm.get_free_memory();
+			auto cur_read = nread.load();
+			printf("shm used : %8ld/%ld, write: %8ld, read: %8ld, speed: %8ld\n", init - cur, init, nwrite.load(), cur_read, cur_read - last_read);
+			last_read = cur_read;
+			std::this_thread::sleep_for(1s);
+		}
+	};
+
+	int nwriters[] = {1, 10, 100, 1000};
+	int nreaders[] = {2};
+	const int64_t total_msg = 1000 * 1000;
+
+	auto Test = [&](auto &www, auto &rrr, bool isfork) {
+		for (auto nreader : nreaders) {
+			for (auto nwriter : nwriters) {
+				const uint64_t nmsg = total_msg / nwriter;
+				std::atomic<bool> run(true);
+				std::this_thread::sleep_for(10ms);
+				boost::timer::auto_cpu_timer timer;
+				for (int i = 0; i < nreader; ++i) {
+					rrr.Launch(Reader, i, &run, isfork);
+				}
+				for (int i = 0; i < nwriter; ++i) {
+					www.Launch(Writer, i, nmsg);
+				}
+				www.WaitAll();
+				printf("writer finished\n");
+				run.store(false);
+				rrr.WaitAll();
+				printf("Write %ld msg  R(%3d) W(%3d), : ", total_msg, nreader, nwriter);
+			}
+		}
+	};
+
+	std::atomic<bool> run(true);
+	ThreadManager state;
+	state.Launch(State, &run);
+	DEFER1(run.store(false););
+
+	// typedef ProcessManager Manager;
+	// typedef ThreadManager Manager;
+	// const bool isfork = IsSameType<Manager, ProcessManager>::value;
+
+	if (0) {
+		ThreadManager tw, tr;
+		printf("---------------- Testing thread io:  -------------------------------------------------------\n");
+		Test(tw, tr, false);
+	}
+
+	if (1) {
+		ProcessManager pw, pr;
+		printf("================ Testing process io: =======================================================\n");
+		Test(pw, pr, true);
+	}
 }
 
-// Request Reply Test
-BOOST_AUTO_TEST_CASE(RRTest)
+// Send Recv Test
+BOOST_AUTO_TEST_CASE(SRTest)
 {
-    const std::string shm_name("ShmReqRep");
-    ShmRemover auto_remove(shm_name);
-    const int qlen = 64;
-    const size_t msg_length = 1000;
-    std::string msg_content(msg_length, 'a');
-    msg_content[20] = '\0';
+	const int qlen = 64;
+	const size_t msg_length = 100;
+	std::string msg_content(msg_length, 'a');
+	msg_content[20] = '\0';
+	const std::string client_proc_id = "client_proc";
+	const std::string server_proc_id = "server_proc";
 
-    SharedMemory shm(shm_name, 1024*1024*50);
-    auto Avail = [&]() { return shm.get_free_memory(); };
-    auto init_avail = Avail();
-    ShmMsgQueue srv(shm, qlen);
-    ShmMsgQueue cli(shm, qlen);
+	SharedMemory &shm = TestShm();
+	// shm.Remove();
+	// return;
+	GlobalInit(shm);
 
-    MsgI request_rc;
-    request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size()));
-    MsgI reply_rc;
-    reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size()));
+	auto Avail = [&]() { return shm.get_free_memory(); };
+	auto init_avail = Avail();
+	ShmSocket srv(shm, ShmMsgQueue::NewId(), qlen);
+	ShmSocket cli(shm, ShmMsgQueue::NewId(), qlen);
 
-    std::atomic<uint64_t> count(0);
+	int ncli = 1;
+	uint64_t nmsg = 1000 * 1000 * 1;
+	std::atomic<uint64_t> count(0);
 
-    std::atomic<ptime> last_time(Now() - seconds(1));
-    std::atomic<uint64_t> last_count(0);
+	std::atomic<int64_t> last_time(NowSec() - 1);
+	std::atomic<uint64_t> last_count(0);
 
-    auto Client = [&](int cli_id, int nmsg){
-        for (int i = 0; i < nmsg; ++i) {
-            auto Req = [&]() {
-                return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100);
-            };
-            auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
+	auto PrintStatus = [&](int64_t cur) {
+		std::cout << "time: " << cur;
+		printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld\n",
+		       count.load(), count - last_count.exchange(count), init_avail - Avail());
+	};
+	auto onRecv = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
+		++count;
+		auto cur = NowSec();
+		if (last_time.exchange(cur) < cur) {
+			PrintStatus(cur);
+		}
+	};
+	cli.Start(onRecv, 2);
 
-            if (!ReqRC()) {
-                printf("********** client send error.\n");
-                continue;
-            }
-            BHMsg msg;
-            if (!cli.Recv(msg, 1000)) {
-                printf("********** client recv error.\n");
-            } else {
-                ++count;
-                auto cur = Now();
-                if (last_time.exchange(cur) < cur) {
-                    std::cout << "time: " << cur;
-                    printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
-                           count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
-                }
+	auto Client = [&](int cli_id, int nmsg) {
+		for (int i = 0; i < nmsg; ++i) {
+			auto Req = [&]() {
+				MsgRequestTopic req_body;
+				req_body.set_topic("topic");
+				req_body.set_data(msg_content);
+				auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id()));
+				auto route = req_head.add_route();
+				route->set_mq_id(cli.id());
+				route->set_abs_addr(cli.AbsAddr());
+				return cli.Send({srv.id(), srv.AbsAddr()}, req_head, req_body);
+			};
 
-            }
-        }
-    };
+			Req();
+		}
+	};
+	auto onRequest = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
+		if (head.type() == kMsgTypeRequestTopic) {
+			MQInfo src_mq = {head.route()[0].mq_id(), head.route()[0].abs_addr()};
 
-    std::atomic<bool> stop(false);
-    auto Server = [&](){
-        BHMsg req;
-        while (!stop) {
-            if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) {
-                auto &mqid = req.route()[0].mq_id();
-                MQId src_id;
-                memcpy(&src_id, mqid.data(), sizeof(src_id));
-                auto Reply = [&]() {
-                    return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100);
-                };
-                auto ReplyRC = [&](){ return srv.Send(src_id, reply_rc, 100); };
+			MsgRequestTopic reply_body;
+			reply_body.set_topic("topic");
+			reply_body.set_data(msg_content);
+			auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), head.msg_id()));
+			srv.Send(src_mq, reply_head, reply_body);
+		}
+	};
+	srv.Start(onRequest);
 
-                if (ReplyRC()) {
-                }
-            }
-        }
-    };
+	boost::timer::auto_cpu_timer timer;
+	DEFER1(printf("Request Reply Test:"););
 
-    boost::timer::auto_cpu_timer timer;
-    DEFER1(printf("Request Reply Test:"););
+	ThreadManager clients;
 
-    ThreadManager clients, servers;
-    for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
-    int ncli = 100*1;
-    uint64_t nmsg = 100*100*2;
-    printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
-    for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
-    clients.WaitAll();
-    printf("request ok: %ld\n", count.load());
-    stop = true;
-    servers.WaitAll();
-    BOOST_CHECK(request_rc.IsCounted());
-    BOOST_CHECK_EQUAL(request_rc.Count(), 1);
-    request_rc.Release(shm);
-    BOOST_CHECK(!request_rc.IsCounted());
-    // BOOST_CHECK_THROW(reply.Count(), int);
+	printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
+	for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
+	clients.WaitAll();
+	printf("request ok: %ld\n", count.load());
+	do {
+		std::this_thread::sleep_for(100ms);
+	} while (count.load() < ncli * nmsg);
+	PrintStatus(NowSec());
+	srv.Stop();
+	// BOOST_CHECK_THROW(reply.Count(), int);
 }

--
Gitblit v1.8.0