From d17450bd7bc9fd5e98e8e2f00999caffe2e301a6 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 26 三月 2021 17:43:09 +0800 Subject: [PATCH] test thread/fork speed; reset msg after release. --- utest/utest.cpp | 217 +++++++++++++++++++++++++++++++++++++++++++----------- 1 files changed, 172 insertions(+), 45 deletions(-) diff --git a/utest/utest.cpp b/utest/utest.cpp index 2462ecc..89d1ea3 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -13,9 +13,14 @@ #include <boost/uuid/uuid_io.hpp> #include "shm_queue.h" #include "bh_util.h" +#include <sys/types.h> +#include <sys/wait.h> using namespace std::chrono_literals; using namespace bhome_shm; + +using namespace boost::posix_time; +auto Now = []() { return second_clock::universal_time(); }; struct s1000 { char a[1000]; }; @@ -41,7 +46,28 @@ } }; class ProcessManager { - + std::vector<pid_t> procs_; +public: + ~ProcessManager() { WaitAll(); } + template <class T, class ...P> + void Launch(T t, P...p) { + auto pid = fork(); + if (pid == 0) { + // child + t(p...); + exit(0); + } else if (pid != -1) { // Ok + procs_.push_back(pid); + } + }; + void WaitAll() { + for (auto &pid: procs_) { + int status = 0; + int options = WUNTRACED | WCONTINUED; + waitpid(pid, &status, options); + } + procs_.clear(); + } }; struct ShmRemover { std::string name_; @@ -49,11 +75,15 @@ ~ShmRemover() { SharedMemory::Remove(name_); } }; -BOOST_AUTO_TEST_CASE(ShmBasicTest) +template <class A, class B> struct IsSameType { static const bool value = false; }; +template <class A> struct IsSameType<A,A> { static const bool value = true; }; + +BOOST_AUTO_TEST_CASE(BasicTest) { const std::string shm_name("basic"); ShmRemover auto_remove(shm_name); SharedMemory shm(shm_name, 1024*1024*10); + auto Avail = [&]() { return shm.get_free_memory(); }; offset_ptr<const void> p; BOOST_CHECK(!p); @@ -70,7 +100,6 @@ BOOST_CHECK(p.get() == 0); - auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); auto BasicTest = [&](int tid, int nloop) { @@ -101,15 +130,12 @@ bool r = shared_memory_object::remove(shm_name.c_str()); BOOST_CHECK(r); }; - boost::timer::auto_cpu_timer timer; - for (int i = 0; i < nloop; ++i) - { + for (int i = 0; i < nloop; ++i) { Code(i + tid*nloop); } }; - boost::timer::auto_cpu_timer timer; - DEFER1(printf("Basic Test:");); + // boost::timer::auto_cpu_timer timer; ThreadManager threads; int nthread = 1; int nloop = 1; @@ -117,8 +143,24 @@ { threads.Launch(BasicTest, i, nloop); } - printf("end\n"); BOOST_CHECK_EQUAL(init_avail, Avail()); +} + +BOOST_AUTO_TEST_CASE(ForkTest) +{ + ProcessManager procs; + const int nproc = 10; + + printf("Testing fork:\n"); + + auto child = [&](int id) { + std::this_thread::sleep_for(100ms *id); + printf("child id: %3d/%d ends\r", id, nproc); + }; + + for (int i = 0; i < nproc; ++i) { + procs.Launch(child, i+1); + } } BOOST_AUTO_TEST_CASE(TimedWaitTest) @@ -127,9 +169,9 @@ ShmRemover auto_remove(shm_name); SharedMemory shm(shm_name, 1024*1024); ShmMsgQueue q(shm, 64); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2; ++i) { int ms = i * 100; - printf("Timeout Test %d: ", ms); + printf("Timeout Test %4d: ", ms); boost::timer::auto_cpu_timer timer; MQId id; void *data; @@ -155,7 +197,7 @@ BOOST_CHECK_EQUAL(m0.Release(shm), 2); BOOST_CHECK_EQUAL(m0.Release(shm), 1); BOOST_CHECK_EQUAL(m1.Release(shm), 0); - BOOST_CHECK_THROW(m1.Count(), std::exception); + BOOST_CHECK(!m1.IsCounted()); } BOOST_AUTO_TEST_CASE(MsgHeaderTest) @@ -176,43 +218,129 @@ MsgMetaV1 result; result.Parse(buf); BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0); - } -BOOST_AUTO_TEST_CASE(RequestReplyTest) + +BOOST_AUTO_TEST_CASE(SpeedTest) +{ + const std::string shm_name("ShmSpeed"); + ShmRemover auto_remove(shm_name); + const int mem_size = 1024*1024*50; + MQId id = boost::uuids::random_generator()(); + const int timeout = 100; + const uint32_t data_size = 4000; + + auto Writer = [&](int writer_id, uint64_t n) { + SharedMemory shm(shm_name, mem_size); + ShmMsgQueue mq(shm, 64); + std::string str(data_size, 'a'); + Msg msg; + DEFER1(msg.Release(shm);); + msg.Build(shm, mq.Id(), str.data(), str.size(), true); + for (int i = 0; i < n; ++i) { + // mq.Send(id, str.data(), str.size(), timeout); + mq.Send(id, msg, timeout); + } + }; + auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork){ + SharedMemory shm(shm_name, mem_size); + ShmMsgQueue mq(id, shm, 1000); + while(*run) { + Msg msg; + if (mq.Recv(msg, timeout)) { + MsgMetaV1 header; + if (!header.Parse(msg.get())) { + BOOST_CHECK(false); + } + if (header.data_size_ != data_size) { + BOOST_CHECK(false); + } + msg.Release(shm); + } else if (isfork) { + exit(0); // for forked quit after 1s. + } + } + }; + auto State = [&](std::atomic<bool> *run){ + SharedMemory shm(shm_name, mem_size); + auto init = shm.get_free_memory(); + printf("shm init : %ld\n", init); + while (*run) { + auto cur = shm.get_free_memory(); + printf("shm used : %8ld/%ld\n", init - cur, init); + std::this_thread::sleep_for(1s); + } + }; + + int nwriters[] = {1,2,4}; + int nreaders[] = {1,2}; + + auto Test = [&](auto &www, auto &rrr, bool isfork) { + for (auto nreader : nreaders) { + for (auto nwriter : nwriters) { + const uint64_t nmsg = 1000 * 1000 * 10 / nwriter; + const uint64_t total_msg = nmsg * nwriter; + std::atomic<bool> run(true); + std::this_thread::sleep_for(10ms); + boost::timer::auto_cpu_timer timer; + for (int i = 0; i < nreader; ++i) { + rrr.Launch(Reader, i, &run, isfork); + } + for (int i = 0; i < nwriter; ++i) { + www.Launch(Writer, i, nmsg); + } + www.WaitAll(); + run.store(false); + rrr.WaitAll(); + printf("%3d Write %ld msg R(%3d) W(%3d), : ", getpid(), total_msg, nreader, nwriter); + } + } + }; + + std::atomic<bool> run(true); + ThreadManager state; + state.Launch(State, &run); + // typedef ProcessManager Manager; + // typedef ThreadManager Manager; + // const bool isfork = IsSameType<Manager, ProcessManager>::value; + ProcessManager pw, pr; + printf("================ Testing process io: =======================================================\n"); + Test(pw, pr, true); + ThreadManager tw, tr; + printf("---------------- Testing thread io: -------------------------------------------------------\n"); + Test(tw, tr, false); + run.store(false); +} + +// Request Reply Test +BOOST_AUTO_TEST_CASE(RRTest) { const std::string shm_name("ShmReqRep"); ShmRemover auto_remove(shm_name); - SharedMemory shm(shm_name, 1024*1024*50); - auto Avail = [&]() { return shm.get_free_memory(); }; - auto init_avail = Avail(); - // DEFER1(BOOST_CHECK_EQUAL(init_avail, Avail()); printf("Request Reply Test shm No Leak.\n");); - - auto f0 = init_avail; const int qlen = 64; - ShmMsgQueue srv(shm, qlen); - ShmMsgQueue cli(shm, qlen); - auto f1= shm.get_free_memory(); - const size_t msg_length = 1000; std::string msg_content(msg_length, 'a'); msg_content[20] = '\0'; - Msg request; - request.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true); - Msg reply(request); - std::atomic<bool> stop(false); + SharedMemory shm(shm_name, 1024*1024*50); + auto Avail = [&]() { return shm.get_free_memory(); }; + auto init_avail = Avail(); + ShmMsgQueue srv(shm, qlen); + ShmMsgQueue cli(shm, qlen); + + Msg ref_counted_msg; + ref_counted_msg.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true); + std::atomic<uint64_t> count(0); - using namespace boost::posix_time; - auto Now = []() { return second_clock::universal_time(); }; + std::atomic<ptime> last_time(Now() - seconds(1)); std::atomic<uint64_t> last_count(0); - auto Client = [&](int tid, int nmsg){ + auto Client = [&](int cli_id, int nmsg){ for (int i = 0; i < nmsg; ++i) { auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); }; - // auto SendRefCounted = [&]() { return cli.Send(srv.Id(), request, 1000); }; + auto SendRefCounted = [&]() { return cli.Send(srv.Id(), ref_counted_msg, 1000); }; - if (!Send()) { + if (!SendRefCounted()) { printf("********** client send error.\n"); continue; } @@ -230,8 +358,8 @@ auto cur = Now(); if (last_time.exchange(cur) != cur) { std::cout << "time: " << Now(); - printf(", total msg:%10ld, speed:%8ld/s, used mem:%8ld, refcount:%d\n", - count.load(), count - last_count.exchange(count), init_avail - Avail(), request.Count()); + printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n", + count.load(), count - last_count.exchange(count), init_avail - Avail(), ref_counted_msg.Count()); last_time = cur; } @@ -239,6 +367,7 @@ } }; + std::atomic<bool> stop(false); auto Server = [&](){ void *data = 0; size_t size = 0; @@ -247,9 +376,9 @@ if (srv.Recv(src_id, data, size, 100)) { DEFER1(free(data)); auto Send = [&](){ return srv.Send(src_id, data, size, 100); }; - // auto SendRefCounted = [&](){ return srv.Send(src_id, reply, 100); }; + auto SendRefCounted = [&](){ return srv.Send(src_id, ref_counted_msg, 100); }; - if (Send()) { + if (SendRefCounted()) { if (size != msg_content.size()) { BOOST_TEST(false, "server msg size error"); } @@ -264,23 +393,21 @@ ThreadManager clients, servers; for (int i = 0; i < qlen; ++i) { servers.Launch(Server); } int ncli = 100*1; - uint64_t nmsg = 100*100*10; + uint64_t nmsg = 100*100*2; printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg); for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); } clients.WaitAll(); printf("request ok: %ld\n", count.load()); stop = true; servers.WaitAll(); - BOOST_CHECK(request.IsCounted()); - BOOST_CHECK_EQUAL(request.Count(), 1); - BOOST_CHECK(reply.IsCounted()); - BOOST_CHECK_EQUAL(reply.Count(), 1); - request.Release(shm); - BOOST_CHECK_THROW(request.Count(), std::exception); - BOOST_CHECK_THROW(reply.Count(), std::exception); + BOOST_CHECK(ref_counted_msg.IsCounted()); + BOOST_CHECK_EQUAL(ref_counted_msg.Count(), 1); + ref_counted_msg.Release(shm); + BOOST_CHECK(!ref_counted_msg.IsCounted()); // BOOST_CHECK_THROW(reply.Count(), int); } + inline int MyMin(int a, int b) { printf("MyMin\n"); return a < b ? a : b; -- Gitblit v1.8.0