lichao
2021-03-29 f51636c193d032723c47343e39ff8296db350200
utest/utest.cpp
@@ -178,10 +178,8 @@
        int ms = i * 100;
        printf("Timeout Test %4d: ", ms);
        boost::timer::auto_cpu_timer timer;
        MQId id;
        void *data;
        size_t size;
        bool r = q.Recv(id, data, size, ms);
        BHMsg msg;
        bool r = q.Recv(msg, ms);
        BOOST_CHECK(!r);
    }
}
@@ -192,10 +190,10 @@
    ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024*1024);
    Msg m0(shm.Alloc(1000), shm.New<RefCount>());
    MsgI m0(shm.Alloc(1000), shm.New<RefCount>());
    BOOST_CHECK(m0.IsCounted());
    BOOST_CHECK_EQUAL(m0.Count(), 1);
    Msg m1 = m0;
    MsgI m1 = m0;
    BOOST_CHECK(m1.IsCounted());
    BOOST_CHECK_EQUAL(m1.AddRef(), 2);
    BOOST_CHECK_EQUAL(m0.AddRef(), 3);
@@ -205,25 +203,7 @@
    BOOST_CHECK(!m1.IsCounted());
}
BOOST_AUTO_TEST_CASE(MsgHeaderTest)
{
    MsgMetaV1 head;
    BOOST_CHECK_EQUAL(head.self_size_, sizeof(head));
    BOOST_CHECK_EQUAL(head.type_, kMsgTypeNormal);
    BOOST_CHECK_EQUAL(head.tag_, kMsgMetaTag);
    BOOST_CHECK_EQUAL(head.data_size_, 0);
    BOOST_CHECK(head.src_id_ == boost::uuids::nil_uuid());
    head.data_size_ = 100;
    head.src_id_ = boost::uuids::random_generator()();
    head.type_ = 123;
    char buf[100] = {0};
    head.Pack(buf);
    MsgMetaV1 result;
    result.Parse(buf);
    BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0);
}
BOOST_AUTO_TEST_CASE(SpeedTest)
{
@@ -238,10 +218,10 @@
        SharedMemory shm(shm_name, mem_size);
        ShmMsgQueue mq(shm, 64);
        std::string str(data_size, 'a');
        Msg msg;
        MsgI msg;
        DEFER1(msg.Release(shm););
        msg.Build(shm, mq.Id(), str.data(), str.size(), true);
        for (int i = 0; i < n; ++i) {
        msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size()));
        for (uint64_t i = 0; i < n; ++i) {
            // mq.Send(id, str.data(), str.size(), timeout);
            mq.Send(id, msg, timeout);
        }
@@ -250,16 +230,9 @@
        SharedMemory shm(shm_name, mem_size);
        ShmMsgQueue mq(id, shm, 1000);
        while(*run) {
            Msg msg;
            BHMsg msg;
            if (mq.Recv(msg, timeout)) {
                MsgMetaV1 header;
                if (!header.Parse(msg.get())) {
                    BOOST_CHECK(false);
                }
                if (header.data_size_ != data_size) {
                    BOOST_CHECK(false);
                }
                msg.Release(shm);
                // ok
            } else if (isfork) {
                exit(0); // for forked quit after 1s.
            }
@@ -332,8 +305,10 @@
    ShmMsgQueue srv(shm, qlen);
    ShmMsgQueue cli(shm, qlen);
    Msg ref_counted_msg;
    ref_counted_msg.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true);
    MsgI request_rc;
    request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size()));
    MsgI reply_rc;
    reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size()));
    std::atomic<uint64_t> count(0);
@@ -342,29 +317,25 @@
    auto Client = [&](int cli_id, int nmsg){
        for (int i = 0; i < nmsg; ++i) {
            auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); };
            auto SendRefCounted = [&]() { return cli.Send(srv.Id(), ref_counted_msg, 1000); };
            auto Req = [&]() {
                return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100);
            };
            auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
            if (!SendRefCounted()) {
            if (!ReqRC()) {
                printf("********** client send error.\n");
                continue;
            }
            MQId id;
            void *data = 0;
            size_t size = 0;
            if (!cli.Recv(id, data, size, 1000)) {
            BHMsg msg;
            if (!cli.Recv(msg, 1000)) {
                printf("********** client recv error.\n");
            } else {
                DEFER1(free(data));
                if(size != msg_length) {
                    BOOST_CHECK(false);
                }
                ++count;
                auto cur = Now();
                if (last_time.exchange(cur) != cur) {
                    std::cout << "time: " << Now();
                    printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
                           count.load(), count - last_count.exchange(count), init_avail - Avail(), ref_counted_msg.Count());
                           count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
                    last_time = cur;
                }
@@ -374,19 +345,18 @@
    std::atomic<bool> stop(false);
    auto Server = [&](){
        void *data = 0;
        size_t size = 0;
        MQId src_id;
        BHMsg req;
        while (!stop) {
            if (srv.Recv(src_id, data, size, 100)) {
                DEFER1(free(data));
                auto Send = [&](){ return srv.Send(src_id, data, size, 100); };
                auto SendRefCounted = [&](){ return srv.Send(src_id, ref_counted_msg, 100); };
            if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) {
                auto &mqid = req.route()[0].mq_id();
                MQId src_id;
                memcpy(&src_id, mqid.data(), sizeof(src_id));
                auto Reply = [&]() {
                    return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100);
                };
                auto ReplyRC = [&](){ return srv.Send(src_id, reply_rc, 100); };
                if (SendRefCounted()) {
                    if (size != msg_content.size()) {
                        BOOST_TEST(false, "server msg size error");
                    }
                if (ReplyRC()) {
                }
            }
        }
@@ -405,10 +375,10 @@
    printf("request ok: %ld\n", count.load());
    stop = true;
    servers.WaitAll();
    BOOST_CHECK(ref_counted_msg.IsCounted());
    BOOST_CHECK_EQUAL(ref_counted_msg.Count(), 1);
    ref_counted_msg.Release(shm);
    BOOST_CHECK(!ref_counted_msg.IsCounted());
    BOOST_CHECK(request_rc.IsCounted());
    BOOST_CHECK_EQUAL(request_rc.Count(), 1);
    request_rc.Release(shm);
    BOOST_CHECK(!request_rc.IsCounted());
    // BOOST_CHECK_THROW(reply.Count(), int);
}