From d17450bd7bc9fd5e98e8e2f00999caffe2e301a6 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 26 三月 2021 17:43:09 +0800
Subject: [PATCH] test thread/fork speed; reset msg after release.

---
 utest/utest.cpp |  217 +++++++++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 172 insertions(+), 45 deletions(-)

diff --git a/utest/utest.cpp b/utest/utest.cpp
index 2462ecc..89d1ea3 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -13,9 +13,14 @@
 #include <boost/uuid/uuid_io.hpp>
 #include "shm_queue.h"
 #include "bh_util.h"
+#include <sys/types.h>
+#include <sys/wait.h>
 
 using namespace std::chrono_literals;
 using namespace bhome_shm;
+
+using namespace boost::posix_time;
+auto Now = []() { return second_clock::universal_time(); };
 
 struct s1000 { char a[1000]; };
 
@@ -41,7 +46,28 @@
     }
 };
 class ProcessManager {
-
+    std::vector<pid_t> procs_;
+public:
+    ~ProcessManager() { WaitAll(); }
+    template <class T, class ...P>
+    void Launch(T t, P...p) {
+        auto pid = fork();
+        if (pid == 0) {
+            // child
+            t(p...);
+            exit(0);
+        } else if (pid != -1) { // Ok
+            procs_.push_back(pid);
+        }
+    };
+    void WaitAll() {
+        for (auto &pid: procs_) {
+            int status = 0;
+            int options = WUNTRACED | WCONTINUED;
+            waitpid(pid, &status, options);
+        }
+        procs_.clear();
+    }
 };
 struct ShmRemover {
     std::string name_;
@@ -49,11 +75,15 @@
     ~ShmRemover() { SharedMemory::Remove(name_); }
 };
 
-BOOST_AUTO_TEST_CASE(ShmBasicTest)
+template <class A, class B> struct IsSameType { static const bool value = false; };
+template <class A> struct IsSameType<A,A> { static const bool value = true; };
+
+BOOST_AUTO_TEST_CASE(BasicTest)
 {
     const std::string shm_name("basic");
     ShmRemover auto_remove(shm_name);
     SharedMemory shm(shm_name, 1024*1024*10);
+    auto Avail = [&]() { return shm.get_free_memory(); };
 
     offset_ptr<const void> p;
     BOOST_CHECK(!p);
@@ -70,7 +100,6 @@
     BOOST_CHECK(p.get() == 0);
 
 
-    auto Avail = [&]() { return shm.get_free_memory(); };
     auto init_avail = Avail();
 
     auto BasicTest = [&](int tid, int nloop) {
@@ -101,15 +130,12 @@
             bool r = shared_memory_object::remove(shm_name.c_str());
             BOOST_CHECK(r);
         };
-        boost::timer::auto_cpu_timer timer;
-        for (int i = 0; i < nloop; ++i)
-        {
+        for (int i = 0; i < nloop; ++i) {
             Code(i + tid*nloop);
         }
     };
     
-    boost::timer::auto_cpu_timer timer;
-    DEFER1(printf("Basic Test:"););
+    // boost::timer::auto_cpu_timer timer;
     ThreadManager threads;
     int nthread = 1;
     int nloop = 1;
@@ -117,8 +143,24 @@
     {
         threads.Launch(BasicTest, i, nloop);
     }
-    printf("end\n");
     BOOST_CHECK_EQUAL(init_avail, Avail());
+}
+
+BOOST_AUTO_TEST_CASE(ForkTest)
+{
+    ProcessManager procs;
+    const int nproc = 10;
+
+    printf("Testing fork:\n");
+
+    auto child = [&](int id) {
+        std::this_thread::sleep_for(100ms *id);
+        printf("child id: %3d/%d ends\r", id, nproc);
+    };
+
+    for (int i = 0; i < nproc; ++i) {
+        procs.Launch(child, i+1);
+    }
 }
 
 BOOST_AUTO_TEST_CASE(TimedWaitTest)
@@ -127,9 +169,9 @@
     ShmRemover auto_remove(shm_name);
     SharedMemory shm(shm_name, 1024*1024);
     ShmMsgQueue q(shm, 64);
-    for (int i = 0; i < 5; ++i) {
+    for (int i = 0; i < 2; ++i) {
         int ms = i * 100;
-        printf("Timeout Test %d: ", ms);
+        printf("Timeout Test %4d: ", ms);
         boost::timer::auto_cpu_timer timer;
         MQId id;
         void *data;
@@ -155,7 +197,7 @@
     BOOST_CHECK_EQUAL(m0.Release(shm), 2);
     BOOST_CHECK_EQUAL(m0.Release(shm), 1);
     BOOST_CHECK_EQUAL(m1.Release(shm), 0);
-    BOOST_CHECK_THROW(m1.Count(), std::exception);
+    BOOST_CHECK(!m1.IsCounted());
 }
 
 BOOST_AUTO_TEST_CASE(MsgHeaderTest)
@@ -176,43 +218,129 @@
     MsgMetaV1 result;
     result.Parse(buf);
     BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0);
-
 }
-BOOST_AUTO_TEST_CASE(RequestReplyTest)
+
+BOOST_AUTO_TEST_CASE(SpeedTest)
+{
+    const std::string shm_name("ShmSpeed");
+    ShmRemover auto_remove(shm_name);
+    const int mem_size = 1024*1024*50;
+    MQId id = boost::uuids::random_generator()();
+    const int timeout = 100;
+    const uint32_t data_size = 4000;
+
+    auto Writer = [&](int writer_id, uint64_t n) {
+        SharedMemory shm(shm_name, mem_size);
+        ShmMsgQueue mq(shm, 64);
+        std::string str(data_size, 'a');
+        Msg msg;
+        DEFER1(msg.Release(shm););
+        msg.Build(shm, mq.Id(), str.data(), str.size(), true);
+        for (int i = 0; i < n; ++i) {
+            // mq.Send(id, str.data(), str.size(), timeout);
+            mq.Send(id, msg, timeout);
+        }
+    };
+    auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork){
+        SharedMemory shm(shm_name, mem_size);
+        ShmMsgQueue mq(id, shm, 1000);
+        while(*run) {
+            Msg msg;
+            if (mq.Recv(msg, timeout)) {
+                MsgMetaV1 header;
+                if (!header.Parse(msg.get())) {
+                    BOOST_CHECK(false);
+                }
+                if (header.data_size_ != data_size) {
+                    BOOST_CHECK(false);
+                }
+                msg.Release(shm);
+            } else if (isfork) {
+                exit(0); // for forked quit after 1s.
+            }
+        }
+    };
+    auto State = [&](std::atomic<bool> *run){
+        SharedMemory shm(shm_name, mem_size);
+        auto init = shm.get_free_memory();
+        printf("shm init : %ld\n", init);
+        while (*run) {
+            auto cur = shm.get_free_memory();
+            printf("shm used : %8ld/%ld\n", init - cur, init);
+            std::this_thread::sleep_for(1s);
+        }
+    };
+
+    int nwriters[] = {1,2,4};
+    int nreaders[] = {1,2};
+
+    auto Test = [&](auto &www, auto &rrr, bool isfork) {
+        for (auto nreader : nreaders) {
+            for (auto nwriter : nwriters) {
+                const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
+                const uint64_t total_msg = nmsg * nwriter;
+                std::atomic<bool> run(true);
+                std::this_thread::sleep_for(10ms);
+                boost::timer::auto_cpu_timer timer;
+                for (int i = 0; i < nreader; ++i) {
+                    rrr.Launch(Reader, i, &run, isfork);
+                }
+                for (int i = 0; i < nwriter; ++i) {
+                    www.Launch(Writer, i, nmsg);
+                }
+                www.WaitAll();
+                run.store(false);
+                rrr.WaitAll();
+                printf("%3d Write %ld msg  R(%3d) W(%3d), : ", getpid(), total_msg, nreader, nwriter);
+            }
+        }
+    };
+
+    std::atomic<bool> run(true);
+    ThreadManager state;
+    state.Launch(State, &run);
+    // typedef ProcessManager Manager;
+    // typedef ThreadManager Manager;
+    // const bool isfork = IsSameType<Manager, ProcessManager>::value;
+    ProcessManager pw, pr;
+    printf("================ Testing process io: =======================================================\n");
+    Test(pw, pr, true);
+    ThreadManager tw, tr;
+    printf("---------------- Testing thread io:  -------------------------------------------------------\n");
+    Test(tw, tr, false);
+    run.store(false);
+}
+
+// Request Reply Test
+BOOST_AUTO_TEST_CASE(RRTest)
 {
     const std::string shm_name("ShmReqRep");
     ShmRemover auto_remove(shm_name);
-    SharedMemory shm(shm_name, 1024*1024*50);
-    auto Avail = [&]() { return shm.get_free_memory(); };
-    auto init_avail = Avail();
-    // DEFER1(BOOST_CHECK_EQUAL(init_avail, Avail()); printf("Request Reply Test shm No Leak.\n"););
-
-    auto f0 = init_avail;
     const int qlen = 64;
-    ShmMsgQueue srv(shm, qlen);
-    ShmMsgQueue cli(shm, qlen);
-    auto f1= shm.get_free_memory();
-
     const size_t msg_length = 1000;
     std::string msg_content(msg_length, 'a');
     msg_content[20] = '\0';
-    Msg request;
-    request.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true);
-    Msg reply(request);
 
-    std::atomic<bool> stop(false);
+    SharedMemory shm(shm_name, 1024*1024*50);
+    auto Avail = [&]() { return shm.get_free_memory(); };
+    auto init_avail = Avail();
+    ShmMsgQueue srv(shm, qlen);
+    ShmMsgQueue cli(shm, qlen);
+
+    Msg ref_counted_msg;
+    ref_counted_msg.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true);
+
     std::atomic<uint64_t> count(0);
-    using namespace boost::posix_time;
-    auto Now = []() { return second_clock::universal_time(); };
+
     std::atomic<ptime> last_time(Now() - seconds(1));
     std::atomic<uint64_t> last_count(0);
 
-    auto Client = [&](int tid, int nmsg){
+    auto Client = [&](int cli_id, int nmsg){
         for (int i = 0; i < nmsg; ++i) {
             auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); };
-            // auto SendRefCounted = [&]() { return cli.Send(srv.Id(), request, 1000); };
+            auto SendRefCounted = [&]() { return cli.Send(srv.Id(), ref_counted_msg, 1000); };
 
-            if (!Send()) {
+            if (!SendRefCounted()) {
                 printf("********** client send error.\n");
                 continue;
             }
@@ -230,8 +358,8 @@
                 auto cur = Now();
                 if (last_time.exchange(cur) != cur) {
                     std::cout << "time: " << Now();
-                    printf(", total msg:%10ld, speed:%8ld/s, used mem:%8ld, refcount:%d\n",
-                           count.load(), count - last_count.exchange(count), init_avail - Avail(), request.Count());
+                    printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
+                           count.load(), count - last_count.exchange(count), init_avail - Avail(), ref_counted_msg.Count());
                     last_time = cur;
                 }
 
@@ -239,6 +367,7 @@
         }
     };
 
+    std::atomic<bool> stop(false);
     auto Server = [&](){
         void *data = 0;
         size_t size = 0;
@@ -247,9 +376,9 @@
             if (srv.Recv(src_id, data, size, 100)) {
                 DEFER1(free(data));
                 auto Send = [&](){ return srv.Send(src_id, data, size, 100); };
-                // auto SendRefCounted = [&](){ return srv.Send(src_id, reply, 100); };
+                auto SendRefCounted = [&](){ return srv.Send(src_id, ref_counted_msg, 100); };
 
-                if (Send()) {
+                if (SendRefCounted()) {
                     if (size != msg_content.size()) {
                         BOOST_TEST(false, "server msg size error");
                     }
@@ -264,23 +393,21 @@
     ThreadManager clients, servers;
     for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
     int ncli = 100*1;
-    uint64_t nmsg = 100*100*10;
+    uint64_t nmsg = 100*100*2;
     printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
     for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
     clients.WaitAll();
     printf("request ok: %ld\n", count.load());
     stop = true;
     servers.WaitAll();
-    BOOST_CHECK(request.IsCounted());
-    BOOST_CHECK_EQUAL(request.Count(), 1);
-    BOOST_CHECK(reply.IsCounted());
-    BOOST_CHECK_EQUAL(reply.Count(), 1);
-    request.Release(shm);
-    BOOST_CHECK_THROW(request.Count(), std::exception);
-    BOOST_CHECK_THROW(reply.Count(), std::exception);
+    BOOST_CHECK(ref_counted_msg.IsCounted());
+    BOOST_CHECK_EQUAL(ref_counted_msg.Count(), 1);
+    ref_counted_msg.Release(shm);
+    BOOST_CHECK(!ref_counted_msg.IsCounted());
     // BOOST_CHECK_THROW(reply.Count(), int);
 }
 
+
 inline int MyMin(int a, int b) {
     printf("MyMin\n");
     return a < b ? a : b;

--
Gitblit v1.8.0