From 6aa7e4c37a70709e7348bd16407c5983a563ed76 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 29 三月 2021 21:11:34 +0800 Subject: [PATCH] test pub/sub msg; fix update last_time; --- utest/utest.cpp | 87 ++++++++++++++++++++++++++++++++++++++++++- 1 files changed, 84 insertions(+), 3 deletions(-) diff --git a/utest/utest.cpp b/utest/utest.cpp index 6e41116..a074f41 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -15,6 +15,8 @@ #include "bh_util.h" #include <sys/types.h> #include <sys/wait.h> +#include "pubsub.h" +#include "defs.h" using namespace std::chrono_literals; using namespace bhome_msg; @@ -332,11 +334,10 @@ } else { ++count; auto cur = Now(); - if (last_time.exchange(cur) != cur) { - std::cout << "time: " << Now(); + if (last_time.exchange(cur) < cur) { + std::cout << "time: " << cur; printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n", count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count()); - last_time = cur; } } @@ -382,6 +383,86 @@ // BOOST_CHECK_THROW(reply.Count(), int); } +BOOST_AUTO_TEST_CASE(PubSubTest) +{ + const std::string shm_name("ShmPubSub"); + ShmRemover auto_remove(shm_name); + SharedMemory shm(shm_name, 1024*1024*50); + auto Avail = [&]() { return shm.get_free_memory(); }; + auto init_avail = Avail(); + + BusManager bus(shm); + bus.Start(1); + std::this_thread::sleep_for(100ms); + + std::atomic<uint64_t> count(0); + std::atomic<ptime> last_time(Now() - seconds(1)); + std::atomic<uint64_t> last_count(0); + + const uint64_t nmsg = 1000 * 100; + + const int timeout = 1000; + auto Sub = [&](int id, const std::vector<std::string> &topics) { + ShmMsgQueue client(shm, 8); + client.Send(kBusQueueId, MakeSub(client.Id(), topics), timeout); + for (int i = 0; i < nmsg * topics.size(); ++i) { + BHMsg msg; + if (client.Recv(msg, 1000)) { + if (msg.type() != kMsgTypePublish) { + BOOST_CHECK(false); + } + DataPub pub; + if (!pub.ParseFromString(msg.body())) { + BOOST_CHECK(false); + } + ++count; + auto cur = Now(); + if (last_time.exchange(cur) < cur) { + std::cout << "time: " << cur; + printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", + count.load(), count - last_count.exchange(count), init_avail - Avail()); + } + // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); + + } else { + printf("sub %2d recv timeout\n", id); + } + + } + }; + auto Pub = [&](const std::string &topic) { + ShmMsgQueue provider(shm, 0); + for (int i = 0; i < nmsg; ++i) { + std::string data = topic + std::to_string(i); + bool r = provider.Send(kBusQueueId, MakePub(topic, data.data(), data.size()), timeout); + if (!r) { + printf("pub ret: %s\n", r ? "ok" : "fail"); + } + } + }; + ThreadManager threads; + typedef std::vector<std::string> Topics; + Topics topics; + topics.push_back("000-xxx"); + topics.push_back("111-football"); + topics.push_back("222-sport"); + // topics.push_back("333-sport"); + // topics.push_back("444-sport"); + Topics part; + for (int i = 0; i < topics.size(); ++i) { + part.push_back(topics[i]); + threads.Launch(Sub, i, part); + } + std::this_thread::sleep_for(100ms); + for (auto &topic: topics) { + threads.Launch(Pub, topic); + } + threads.Launch(Pub, "some_else"); + + threads.WaitAll(); + + bus.Stop(); +} inline int MyMin(int a, int b) { printf("MyMin\n"); -- Gitblit v1.8.0