From 5d8aa35858eea622e0e8e4a1f111fd408c483a31 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 25 五月 2021 17:10:16 +0800 Subject: [PATCH] add some tcp code. --- utest/speed_test.cpp | 49 ++++++++++++++++++------------------------------- 1 files changed, 18 insertions(+), 31 deletions(-) diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index 4dea623..f33f0db 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -24,7 +24,7 @@ { SharedMemory &shm = TestShm(); GlobalInit(shm); - MQId server_id = ShmMsgQueue::NewId(); + MQId server_id = NewSession(); ShmMsgQueue server(server_id, shm, 1000); const int timeout = 1000; @@ -35,10 +35,10 @@ std::string str(data_size, 'a'); auto Writer = [&](int writer_id, uint64_t n) { - MQId cli_id = ShmMsgQueue::NewId(); + MQId cli_id = NewSession(); ShmMsgQueue mq(cli_id, shm, 64); - MsgI msg; + MsgI msg(shm); MsgRequestTopic body; body.set_topic("topic"); body.set_data(str); @@ -58,7 +58,7 @@ auto now = []() { return steady_clock::now(); }; auto tm = now(); while (*run) { - MsgI msg; + MsgI msg(shm); BHMsgHead head; if (mq.TryRecv(msg)) { DEFER1(msg.Release()); @@ -149,8 +149,8 @@ auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); - ShmSocket srv(shm, ShmMsgQueue::NewId(), qlen); - ShmSocket cli(shm, ShmMsgQueue::NewId(), qlen); + ShmSocket srv(shm, NewSession(), qlen); + ShmSocket cli(shm, NewSession(), qlen); int ncli = 1; uint64_t nmsg = 1000 * 1000 * 1; @@ -189,36 +189,24 @@ Req(); } }; + auto onRequest = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { + if (head.type() == kMsgTypeRequestTopic) { + MQInfo src_mq = {head.route()[0].mq_id(), head.route()[0].abs_addr()}; - std::atomic<bool> stop(false); - auto Server = [&]() { - MsgI req; - BHMsgHead req_head; - - while (!stop) { - if (srv.SyncRecv(req, req_head, 10)) { - DEFER1(req.Release()); - - if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { - MQInfo src_mq = {req_head.route()[0].mq_id(), req_head.route()[0].abs_addr()}; - auto Reply = [&]() { - MsgRequestTopic reply_body; - reply_body.set_topic("topic"); - reply_body.set_data(msg_content); - auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id())); - return srv.Send(src_mq, reply_head, reply_body); - }; - Reply(); - } - } + MsgRequestTopic reply_body; + reply_body.set_topic("topic"); + reply_body.set_data(msg_content); + auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), head.msg_id())); + srv.Send(src_mq, reply_head, reply_body); } }; + srv.Start(onRequest); boost::timer::auto_cpu_timer timer; DEFER1(printf("Request Reply Test:");); - ThreadManager clients, servers; - for (int i = 0; i < 2; ++i) { servers.Launch(Server); } + ThreadManager clients; + printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg); for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); } clients.WaitAll(); @@ -227,7 +215,6 @@ std::this_thread::sleep_for(100ms); } while (count.load() < ncli * nmsg); PrintStatus(NowSec()); - stop = true; - servers.WaitAll(); + srv.Stop(); // BOOST_CHECK_THROW(reply.Count(), int); } -- Gitblit v1.8.0