From 3f8ae6cf4f03be83f16846af32d73dd89b937c40 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 30 三月 2021 15:45:54 +0800 Subject: [PATCH] add dot separated topic partial match; refactor. --- utest/speed_test.cpp | 2 src/pubsub.h | 2 src/shm_queue.h | 7 +- src/defs.h | 1 src/pubsub.cpp | 64 +++++++++++++++++---- utest/utest.cpp | 82 ++++++++++++++++++--------- 6 files changed, 113 insertions(+), 45 deletions(-) diff --git a/src/defs.h b/src/defs.h index 40421e0..56c6c9c 100644 --- a/src/defs.h +++ b/src/defs.h @@ -26,6 +26,7 @@ const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); const int kBHCenterPort = 24287; +const char kTopicSep = '.'; //TODO center can check shm for previous crash. #endif // end of include guard: DEFS_KP8LKGD0 diff --git a/src/pubsub.cpp b/src/pubsub.cpp index add968c..a0dc4e9 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -28,7 +28,7 @@ BusManager::BusManager(SharedMemory &shm): shm_(shm), -busq_(kBHBusQueueId, shm, 1000), +busq_(kBHBusQueueId, shm, 16), run_(false) { } @@ -46,7 +46,7 @@ auto Worker = [&](){ while (this->run_) { BusManager &self = *this; - BHMsg msg; + MsgI msg; const int timeout_ms = 100; if (self.busq_.Recv(msg, timeout_ms)) { self.OnMsg(msg); @@ -81,8 +81,15 @@ return false; } -void BusManager::OnMsg(const BHMsg &msg) +void BusManager::OnMsg(MsgI &imsg) { + DEFER1(imsg.Release(shm_)); + + BHMsg msg; + if (!imsg.Unpack(msg)) { + return; + } + auto OnSubChange = [&](auto &&update) { DataSub sub; if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) { @@ -117,21 +124,52 @@ auto OnPublish = [&]() { DataPub pub; - MsgI pubmsg; - if (!pub.ParseFromString(msg.body()) || !pubmsg.MakeRC(shm_, msg)) { + if (!pub.ParseFromString(msg.body())) { return; } - DEFER1(pubmsg.Release(shm_)); + auto FindClients = [&](const std::string &topic){ + Clients dests; + std::lock_guard<std::mutex> guard(mutex_); + auto Find1 = [&](const std::string &t) { + auto pos = records_.find(topic); + if (pos != records_.end() && !pos->second.empty()) { + auto &clients = pos->second; + for (auto &cli : clients) { + dests.insert(cli); + } + } + }; + Find1(topic); - std::lock_guard<std::mutex> guard(mutex_); - auto pos = records_.find(pub.topic()); - if (pos != records_.end() && !pos->second.empty()) { - auto &clients = pos->second; - for (auto &cli : clients) { - busq_.Send(cli, pubmsg, 100); + //TODO check and adjust topic on client side sub/pub. + size_t pos = 0; + while (true) { + pos = topic.find(kTopicSep, pos); + if (pos == topic.npos || ++pos == topic.size()) { + // Find1(std::string()); // sub all. + break; + } else { + Find1(topic.substr(0, pos)); + } } + return dests; + }; + + auto Dispatch = [&](auto &&send1) { + const Clients &clients(FindClients(pub.topic())); + for (auto &cli : clients) { + send1(cli); + } + }; + + if (imsg.IsCounted()) { + Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); }); } else { - // printf("invalid topic: %s\n", pub.topic().c_str()); + MsgI pubmsg; + if (!pubmsg.MakeRC(shm_, msg)) { return; } + DEFER1(pubmsg.Release(shm_)); + + Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); }); } }; diff --git a/src/pubsub.h b/src/pubsub.h index c1f98af..11fa4e4 100644 --- a/src/pubsub.h +++ b/src/pubsub.h @@ -40,7 +40,7 @@ std::unordered_map<std::string, Clients> records_; bool StopNoLock(); - void OnMsg(const BHMsg &msg); + void OnMsg(MsgI &msg); public: BusManager(SharedMemory &shm); ~BusManager(); diff --git a/src/shm_queue.h b/src/shm_queue.h index 9d08016..a536553 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -115,10 +115,11 @@ ShmMsgQueue(const MQId &id, ShmType &segment, const int len); ShmMsgQueue(ShmType &segment, const int len); ~ShmMsgQueue(); - // bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms); // request - bool Recv(BHMsg &msg, const int timeout_ms); - bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms); const MQId &Id() const { return id_; } + + bool Recv(BHMsg &msg, const int timeout_ms); + bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); } + bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms); bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms); }; diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index c7f8d4f..b1cba46 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -84,7 +84,7 @@ www.WaitAll(); run.store(false); rrr.WaitAll(); - printf("%3d Write %ld msg R(%3d) W(%3d), : ", getpid(), total_msg, nreader, nwriter); + printf("Write %ld msg R(%3d) W(%3d), : ", total_msg, nreader, nwriter); } } }; diff --git a/utest/utest.cpp b/utest/utest.cpp index b5acc3b..f7571c8 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -13,24 +13,46 @@ template <class A> struct IsSameType<A,A> { static const bool value = true; }; -BOOST_AUTO_TEST_CASE(ShmTest) +BOOST_AUTO_TEST_CASE(Temp) { - const int size = 1024*1024; - const std::string shm_name("ShmRemoveTest"); - SharedMemory m0(shm_name, size); - int *p = m0.find_or_construct<int>("abc")(100); - if (p) { - printf("*p = %d\n", *p); - ++*p; - } - { - SharedMemory m1(shm_name, size); - m1.Remove(); - // m1.Remove(); - } - if (p) { - printf("again *p = %d\n", *p); - // ++*p; + std::string topics[] = { + "", + ".", + "a", + "sp", + "sport", + "sport.", + "sport.a", + "sport.a.b.c", + "sport.ab.c", + "sport.basketball", + "sport.football", + }; + const char sep = '.'; + auto Adjust = [&](const std::string &user_topic) { + if (user_topic.empty() || user_topic.back() == sep) { + return user_topic; + } else { + return user_topic + sep; + } + }; + + for (auto &t : topics) { + const std::string &a = Adjust(t); + printf("orig: %20s adjusted: %20s parts:[", ("'" + t + "'").c_str(), ('\'' + a + '\'').c_str()); + + size_t pos = 0; + while (true) { + auto &topic = t; + pos = topic.find(kTopicSep, pos); + if (pos == topic.npos || ++pos == topic.size()) { + // Find1(std::string()); // sub all. + break; + } else { + printf("'%s',", topic.substr(0, pos).c_str()); + } + } + printf("]\n"); } } @@ -50,7 +72,7 @@ std::atomic<ptime> last_time(Now() - seconds(1)); std::atomic<uint64_t> last_count(0); - const uint64_t nmsg = 1000 * 100; + const uint64_t nmsg = 100; const int timeout = 1000; auto Sub = [&](int id, const std::vector<std::string> &topics) { @@ -74,7 +96,6 @@ count.load(), count - last_count.exchange(count), init_avail - Avail()); } // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); - } else { printf("sub %2d recv timeout\n", id); } @@ -84,8 +105,14 @@ auto Pub = [&](const std::string &topic) { ShmMsgQueue provider(shm, 0); for (int i = 0; i < nmsg; ++i) { - std::string data = topic + std::to_string(i); - bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout); + std::string data = topic + std::to_string(i) + std::string(1000, '-'); + + MsgI msg; + msg.MakeRC(shm, MakePub(topic, data.data(), data.size())); + DEFER1(msg.Release(shm)); + bool r = provider.Send(kBHBusQueueId, msg, timeout); + + // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout); if (!r) { printf("pub ret: %s\n", r ? "ok" : "fail"); } @@ -94,15 +121,13 @@ ThreadManager threads; typedef std::vector<std::string> Topics; Topics topics; - topics.push_back("000-xxx"); - topics.push_back("111-football"); - topics.push_back("222-sport"); - // topics.push_back("333-sport"); - // topics.push_back("444-sport"); + for (int i = 0; i < 100; ++i) { + topics.push_back("t" + std::to_string(i)); + } Topics part; for (int i = 0; i < topics.size(); ++i) { part.push_back(topics[i]); - threads.Launch(Sub, i, part); + threads.Launch(Sub, i, topics); } std::this_thread::sleep_for(100ms); for (auto &topic: topics) { @@ -111,6 +136,9 @@ threads.Launch(Pub, "some_else"); threads.WaitAll(); + std::cout << "end : " << Now(); + printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", + count.load(), count - last_count.exchange(count), init_avail - Avail()); bus.Stop(); } -- Gitblit v1.8.0