lichao
2021-03-29 6aa7e4c37a70709e7348bd16407c5983a563ed76
utest/utest.cpp
@@ -15,6 +15,8 @@
#include "bh_util.h"
#include <sys/types.h>
#include <sys/wait.h>
#include "pubsub.h"
#include "defs.h"
using namespace std::chrono_literals;
using namespace bhome_msg;
@@ -332,11 +334,10 @@
            } else {
                ++count;
                auto cur = Now();
                if (last_time.exchange(cur) != cur) {
                    std::cout << "time: " << Now();
                if (last_time.exchange(cur) < cur) {
                    std::cout << "time: " << cur;
                    printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
                           count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
                    last_time = cur;
                }
            }
@@ -382,6 +383,86 @@
    // BOOST_CHECK_THROW(reply.Count(), int);
}
BOOST_AUTO_TEST_CASE(PubSubTest)
{
    const std::string shm_name("ShmPubSub");
    ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024*1024*50);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    BusManager bus(shm);
    bus.Start(1);
    std::this_thread::sleep_for(100ms);
    std::atomic<uint64_t> count(0);
    std::atomic<ptime> last_time(Now() - seconds(1));
    std::atomic<uint64_t> last_count(0);
    const uint64_t nmsg = 1000 * 100;
    const int timeout = 1000;
    auto Sub = [&](int id, const std::vector<std::string> &topics) {
        ShmMsgQueue client(shm, 8);
        client.Send(kBusQueueId, MakeSub(client.Id(), topics), timeout);
        for (int i = 0; i < nmsg * topics.size(); ++i) {
            BHMsg msg;
            if (client.Recv(msg, 1000)) {
                if (msg.type() != kMsgTypePublish) {
                    BOOST_CHECK(false);
                }
                DataPub pub;
                if (!pub.ParseFromString(msg.body())) {
                    BOOST_CHECK(false);
                }
                ++count;
                auto cur = Now();
                if (last_time.exchange(cur) < cur) {
                    std::cout << "time: " << cur;
                    printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
                           count.load(), count - last_count.exchange(count), init_avail - Avail());
                }
                // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
            } else {
                printf("sub %2d recv timeout\n", id);
            }
        }
    };
    auto Pub = [&](const std::string &topic) {
        ShmMsgQueue provider(shm, 0);
        for (int i = 0; i < nmsg; ++i) {
            std::string data = topic + std::to_string(i);
            bool r = provider.Send(kBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
            if (!r) {
                printf("pub ret: %s\n", r ? "ok" : "fail");
            }
        }
    };
    ThreadManager threads;
    typedef std::vector<std::string> Topics;
    Topics topics;
    topics.push_back("000-xxx");
    topics.push_back("111-football");
    topics.push_back("222-sport");
    // topics.push_back("333-sport");
    // topics.push_back("444-sport");
    Topics part;
    for (int i = 0; i < topics.size(); ++i) {
        part.push_back(topics[i]);
        threads.Launch(Sub, i, part);
    }
    std::this_thread::sleep_for(100ms);
    for (auto &topic: topics) {
        threads.Launch(Pub, topic);
    }
    threads.Launch(Pub, "some_else");
    threads.WaitAll();
    bus.Stop();
}
inline int MyMin(int a, int b) {
    printf("MyMin\n");