lichao
2021-03-25 d3e7f93e69cb24c766292d8780e745caf24d42a8
utest/utest.cpp
@@ -54,6 +54,21 @@
    ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024*1024*10);
    offset_ptr<const void> p;
    BOOST_CHECK(!p);
    BOOST_CHECK(p.get() == 0);
    p = 0;
    BOOST_CHECK(!p);
    BOOST_CHECK(p.get() == 0);
    const char *str = "basic";
    p = str;
    BOOST_CHECK(p);
    BOOST_CHECK(p.get() == str);
    p = 0;
    BOOST_CHECK(!p);
    BOOST_CHECK(p.get() == 0);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
@@ -77,8 +92,8 @@
            {
                auto old = Avail();
                void *p = shm.allocate(1024);
                shm.deallocate(p);
                void *p = shm.Alloc(1024);
                shm.Dealloc(p);
                BOOST_CHECK_EQUAL(old, Avail());
            }
@@ -129,22 +144,16 @@
    ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024*1024);
    Msg m0(shm.allocate(1000), shm.New<RefCount>());
    BOOST_CHECK_EQUAL(m0.AddRef(), 1);
    Msg m0(shm.Alloc(1000), shm.New<RefCount>());
    BOOST_CHECK(m0.IsCounted());
    BOOST_CHECK_EQUAL(m0.Count(), 1);
    Msg m1 = m0;
    BOOST_CHECK(m1.IsCounted());
    BOOST_CHECK_EQUAL(m1.AddRef(), 2);
    BOOST_CHECK_EQUAL(m0.AddRef(), 3);
    BOOST_CHECK_EQUAL(m0.RemoveRef(), 2);
    BOOST_CHECK_EQUAL(m0.RemoveRef(), 1);
    BOOST_CHECK_EQUAL(m1.RemoveRef(), 0);
    {
        Msg::CountGuard guard(m0);
        BOOST_CHECK_EQUAL(m1.AddRef(), 2);
        {
            Msg::CountGuard guard(m0);
            BOOST_CHECK_EQUAL(m1.RemoveRef(), 2);
        }
    }
    BOOST_CHECK_EQUAL(m1.Count(), 0);
}
@@ -155,14 +164,11 @@
    BOOST_CHECK_EQUAL(head.type_, kMsgTypeNormal);
    BOOST_CHECK_EQUAL(head.tag_, kMsgMetaTag);
    BOOST_CHECK_EQUAL(head.data_size_, 0);
    BOOST_CHECK_EQUAL(head.src_id_[5], 0);
    BOOST_CHECK(head.src_id_ == boost::uuids::nil_uuid());
    head.data_size_ = 100;
    auto rand_id = boost::uuids::random_generator()();
    memcpy(head.src_id_, &rand_id, sizeof(rand_id));
    head.src_id_ = boost::uuids::random_generator()();
    head.type_ = 123;
    BOOST_CHECK_EQUAL(sizeof(head.src_id_), sizeof(rand_id));
    char buf[100] = {0};
    head.Pack(buf);
@@ -189,16 +195,23 @@
    const size_t msg_length = 1000;
    std::string msg_content(msg_length, 'a');
    msg_content[20] = '\0';
    Msg request;
    request.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true);
    Msg reply(request);
    std::atomic<bool> stop(false);
    std::atomic<uint64_t> count(0);
    using namespace boost::posix_time;
    auto Now = []() { return second_clock::universal_time(); };
    std::atomic<ptime> last_time(Now());
    std::atomic<ptime> last_time(Now() - seconds(1));
    std::atomic<uint64_t> last_count(0);
    auto Client = [&](int tid, int nmsg){
        for (int i = 0; i < nmsg; ++i) {
            if (!cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000)) {
            auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); };
            auto SendRefCounted = [&]() { return cli.Send(srv.Id(), request, 1000); };
            if (!Send()) {
                printf("********** client send error.\n");
                continue;
            }
@@ -216,7 +229,8 @@
                auto cur = Now();
                if (last_time.exchange(cur) != cur) {
                    std::cout << "time: " << Now();
                    printf(", total msg:%10ld, speed:%8ld/s, used mem:%8ld\n",  count.load(), count - last_count.exchange(count), init_avail - Avail());
                    printf(", total msg:%10ld, speed:%8ld/s, used mem:%8ld, refcount:%d\n",
                           count.load(), count - last_count.exchange(count), init_avail - Avail(), request.Count());
                    last_time = cur;
                }
@@ -231,7 +245,10 @@
        while (!stop) {
            if (srv.Recv(src_id, data, size, 100)) {
                DEFER1(free(data));
                if (srv.Send(src_id, data, size, 100)) {
                auto Send = [&](){ return srv.Send(src_id, data, size, 100); };
                auto SendRefCounted = [&](){ return srv.Send(src_id, reply, 100); };
                if (SendRefCounted()) {
                    if (size != msg_content.size()) {
                        BOOST_TEST(false, "server msg size error");
                    }
@@ -245,14 +262,22 @@
    ThreadManager clients, servers;
    for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
    int ncli = 100;
    uint64_t nmsg = 1000*10;
    int ncli = 100*1;
    uint64_t nmsg = 100*100;
    printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
    for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
    clients.WaitAll();
    printf("request ok: %ld\n", count.load());
    stop = true;
    servers.WaitAll();
    BOOST_CHECK(request.IsCounted());
    BOOST_CHECK_EQUAL(request.Count(), 1);
    BOOST_CHECK(reply.IsCounted());
    BOOST_CHECK_EQUAL(reply.Count(), 1);
    if (request.RemoveRef() == 0) {
        BOOST_CHECK_EQUAL(reply.Count(), 0);
        request.FreeFrom(shm);
    }
}
int test_main(int argc, char *argv[])