From 3f8ae6cf4f03be83f16846af32d73dd89b937c40 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 30 三月 2021 15:45:54 +0800 Subject: [PATCH] add dot separated topic partial match; refactor. --- utest/utest.cpp | 82 +++++++++++++++++++++++++++------------- 1 files changed, 55 insertions(+), 27 deletions(-) diff --git a/utest/utest.cpp b/utest/utest.cpp index b5acc3b..f7571c8 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -13,24 +13,46 @@ template <class A> struct IsSameType<A,A> { static const bool value = true; }; -BOOST_AUTO_TEST_CASE(ShmTest) +BOOST_AUTO_TEST_CASE(Temp) { - const int size = 1024*1024; - const std::string shm_name("ShmRemoveTest"); - SharedMemory m0(shm_name, size); - int *p = m0.find_or_construct<int>("abc")(100); - if (p) { - printf("*p = %d\n", *p); - ++*p; - } - { - SharedMemory m1(shm_name, size); - m1.Remove(); - // m1.Remove(); - } - if (p) { - printf("again *p = %d\n", *p); - // ++*p; + std::string topics[] = { + "", + ".", + "a", + "sp", + "sport", + "sport.", + "sport.a", + "sport.a.b.c", + "sport.ab.c", + "sport.basketball", + "sport.football", + }; + const char sep = '.'; + auto Adjust = [&](const std::string &user_topic) { + if (user_topic.empty() || user_topic.back() == sep) { + return user_topic; + } else { + return user_topic + sep; + } + }; + + for (auto &t : topics) { + const std::string &a = Adjust(t); + printf("orig: %20s adjusted: %20s parts:[", ("'" + t + "'").c_str(), ('\'' + a + '\'').c_str()); + + size_t pos = 0; + while (true) { + auto &topic = t; + pos = topic.find(kTopicSep, pos); + if (pos == topic.npos || ++pos == topic.size()) { + // Find1(std::string()); // sub all. + break; + } else { + printf("'%s',", topic.substr(0, pos).c_str()); + } + } + printf("]\n"); } } @@ -50,7 +72,7 @@ std::atomic<ptime> last_time(Now() - seconds(1)); std::atomic<uint64_t> last_count(0); - const uint64_t nmsg = 1000 * 100; + const uint64_t nmsg = 100; const int timeout = 1000; auto Sub = [&](int id, const std::vector<std::string> &topics) { @@ -74,7 +96,6 @@ count.load(), count - last_count.exchange(count), init_avail - Avail()); } // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); - } else { printf("sub %2d recv timeout\n", id); } @@ -84,8 +105,14 @@ auto Pub = [&](const std::string &topic) { ShmMsgQueue provider(shm, 0); for (int i = 0; i < nmsg; ++i) { - std::string data = topic + std::to_string(i); - bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout); + std::string data = topic + std::to_string(i) + std::string(1000, '-'); + + MsgI msg; + msg.MakeRC(shm, MakePub(topic, data.data(), data.size())); + DEFER1(msg.Release(shm)); + bool r = provider.Send(kBHBusQueueId, msg, timeout); + + // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout); if (!r) { printf("pub ret: %s\n", r ? "ok" : "fail"); } @@ -94,15 +121,13 @@ ThreadManager threads; typedef std::vector<std::string> Topics; Topics topics; - topics.push_back("000-xxx"); - topics.push_back("111-football"); - topics.push_back("222-sport"); - // topics.push_back("333-sport"); - // topics.push_back("444-sport"); + for (int i = 0; i < 100; ++i) { + topics.push_back("t" + std::to_string(i)); + } Topics part; for (int i = 0; i < topics.size(); ++i) { part.push_back(topics[i]); - threads.Launch(Sub, i, part); + threads.Launch(Sub, i, topics); } std::this_thread::sleep_for(100ms); for (auto &topic: topics) { @@ -111,6 +136,9 @@ threads.Launch(Pub, "some_else"); threads.WaitAll(); + std::cout << "end : " << Now(); + printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", + count.load(), count - last_count.exchange(count), init_avail - Avail()); bus.Stop(); } -- Gitblit v1.8.0