From f51636c193d032723c47343e39ff8296db350200 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 29 三月 2021 18:04:00 +0800 Subject: [PATCH] change msg to use protobuf, add more msg type. --- utest/utest.cpp | 100 +++++++++++++++++-------------------------------- 1 files changed, 35 insertions(+), 65 deletions(-) diff --git a/utest/utest.cpp b/utest/utest.cpp index 23af0b8..6e41116 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -178,10 +178,8 @@ int ms = i * 100; printf("Timeout Test %4d: ", ms); boost::timer::auto_cpu_timer timer; - MQId id; - void *data; - size_t size; - bool r = q.Recv(id, data, size, ms); + BHMsg msg; + bool r = q.Recv(msg, ms); BOOST_CHECK(!r); } } @@ -192,10 +190,10 @@ ShmRemover auto_remove(shm_name); SharedMemory shm(shm_name, 1024*1024); - Msg m0(shm.Alloc(1000), shm.New<RefCount>()); + MsgI m0(shm.Alloc(1000), shm.New<RefCount>()); BOOST_CHECK(m0.IsCounted()); BOOST_CHECK_EQUAL(m0.Count(), 1); - Msg m1 = m0; + MsgI m1 = m0; BOOST_CHECK(m1.IsCounted()); BOOST_CHECK_EQUAL(m1.AddRef(), 2); BOOST_CHECK_EQUAL(m0.AddRef(), 3); @@ -205,25 +203,7 @@ BOOST_CHECK(!m1.IsCounted()); } -BOOST_AUTO_TEST_CASE(MsgHeaderTest) -{ - MsgMetaV1 head; - BOOST_CHECK_EQUAL(head.self_size_, sizeof(head)); - BOOST_CHECK_EQUAL(head.type_, kMsgTypeNormal); - BOOST_CHECK_EQUAL(head.tag_, kMsgMetaTag); - BOOST_CHECK_EQUAL(head.data_size_, 0); - BOOST_CHECK(head.src_id_ == boost::uuids::nil_uuid()); - head.data_size_ = 100; - head.src_id_ = boost::uuids::random_generator()(); - head.type_ = 123; - - char buf[100] = {0}; - head.Pack(buf); - MsgMetaV1 result; - result.Parse(buf); - BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0); -} BOOST_AUTO_TEST_CASE(SpeedTest) { @@ -238,10 +218,10 @@ SharedMemory shm(shm_name, mem_size); ShmMsgQueue mq(shm, 64); std::string str(data_size, 'a'); - Msg msg; + MsgI msg; DEFER1(msg.Release(shm);); - msg.Build(shm, mq.Id(), str.data(), str.size(), true); - for (int i = 0; i < n; ++i) { + msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size())); + for (uint64_t i = 0; i < n; ++i) { // mq.Send(id, str.data(), str.size(), timeout); mq.Send(id, msg, timeout); } @@ -250,16 +230,9 @@ SharedMemory shm(shm_name, mem_size); ShmMsgQueue mq(id, shm, 1000); while(*run) { - Msg msg; + BHMsg msg; if (mq.Recv(msg, timeout)) { - MsgMetaV1 header; - if (!header.Parse(msg.get())) { - BOOST_CHECK(false); - } - if (header.data_size_ != data_size) { - BOOST_CHECK(false); - } - msg.Release(shm); + // ok } else if (isfork) { exit(0); // for forked quit after 1s. } @@ -332,8 +305,10 @@ ShmMsgQueue srv(shm, qlen); ShmMsgQueue cli(shm, qlen); - Msg ref_counted_msg; - ref_counted_msg.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true); + MsgI request_rc; + request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size())); + MsgI reply_rc; + reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size())); std::atomic<uint64_t> count(0); @@ -342,29 +317,25 @@ auto Client = [&](int cli_id, int nmsg){ for (int i = 0; i < nmsg; ++i) { - auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); }; - auto SendRefCounted = [&]() { return cli.Send(srv.Id(), ref_counted_msg, 1000); }; + auto Req = [&]() { + return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100); + }; + auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); }; - if (!SendRefCounted()) { + if (!ReqRC()) { printf("********** client send error.\n"); continue; } - MQId id; - void *data = 0; - size_t size = 0; - if (!cli.Recv(id, data, size, 1000)) { + BHMsg msg; + if (!cli.Recv(msg, 1000)) { printf("********** client recv error.\n"); } else { - DEFER1(free(data)); - if(size != msg_length) { - BOOST_CHECK(false); - } ++count; auto cur = Now(); if (last_time.exchange(cur) != cur) { std::cout << "time: " << Now(); printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n", - count.load(), count - last_count.exchange(count), init_avail - Avail(), ref_counted_msg.Count()); + count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count()); last_time = cur; } @@ -374,19 +345,18 @@ std::atomic<bool> stop(false); auto Server = [&](){ - void *data = 0; - size_t size = 0; - MQId src_id; + BHMsg req; while (!stop) { - if (srv.Recv(src_id, data, size, 100)) { - DEFER1(free(data)); - auto Send = [&](){ return srv.Send(src_id, data, size, 100); }; - auto SendRefCounted = [&](){ return srv.Send(src_id, ref_counted_msg, 100); }; + if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) { + auto &mqid = req.route()[0].mq_id(); + MQId src_id; + memcpy(&src_id, mqid.data(), sizeof(src_id)); + auto Reply = [&]() { + return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100); + }; + auto ReplyRC = [&](){ return srv.Send(src_id, reply_rc, 100); }; - if (SendRefCounted()) { - if (size != msg_content.size()) { - BOOST_TEST(false, "server msg size error"); - } + if (ReplyRC()) { } } } @@ -405,10 +375,10 @@ printf("request ok: %ld\n", count.load()); stop = true; servers.WaitAll(); - BOOST_CHECK(ref_counted_msg.IsCounted()); - BOOST_CHECK_EQUAL(ref_counted_msg.Count(), 1); - ref_counted_msg.Release(shm); - BOOST_CHECK(!ref_counted_msg.IsCounted()); + BOOST_CHECK(request_rc.IsCounted()); + BOOST_CHECK_EQUAL(request_rc.Count(), 1); + request_rc.Release(shm); + BOOST_CHECK(!request_rc.IsCounted()); // BOOST_CHECK_THROW(reply.Count(), int); } -- Gitblit v1.8.0