From 6aa7e4c37a70709e7348bd16407c5983a563ed76 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 29 三月 2021 21:11:34 +0800 Subject: [PATCH] test pub/sub msg; fix update last_time; --- src/msg.h | 4 src/pubsub.h | 6 ++ src/pubsub.cpp | 72 ++++++++++++++++++++++- utest/utest.cpp | 87 ++++++++++++++++++++++++++++- src/msg.cpp | 8 +- 5 files changed, 163 insertions(+), 14 deletions(-) diff --git a/src/msg.cpp b/src/msg.cpp index ed8adba..78834a8 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -37,7 +37,6 @@ assert(data && size); BHMsg msg(InitMsg(kMsgTypeRequest)); msg.set_body(data, size); - BHAddress addr; msg.add_route()->set_mq_id(&src_id, sizeof(src_id)); return msg; } @@ -50,10 +49,11 @@ return msg; } -BHMsg MakeSubUnsub(const std::vector<std::string> &topics, const MsgType sub_unsub) +BHMsg MakeSubUnsub(const MQId &client, const std::vector<std::string> &topics, const MsgType sub_unsub) { assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe); BHMsg msg(InitMsg(sub_unsub)); + msg.add_route()->set_mq_id(&client, sizeof(client)); DataSub subs; for (auto &t : topics) { subs.add_topics(t); @@ -62,8 +62,8 @@ return msg; } -BHMsg MakeSub(const std::vector<std::string> &topics) { return MakeSubUnsub(topics, kMsgTypeSubscribe); } -BHMsg MakeUnsub(const std::vector<std::string> &topics) { return MakeSubUnsub(topics, kMsgTypeUnsubscribe); } +BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeSubscribe); } +BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeUnsubscribe); } BHMsg MakePub(const std::string &topic, const void *data, const size_t size) { diff --git a/src/msg.h b/src/msg.h index 5cb0ce4..f3fe726 100644 --- a/src/msg.h +++ b/src/msg.h @@ -47,8 +47,8 @@ BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size); BHMsg MakeReply(const void *data, const size_t size); -BHMsg MakeSub(const std::vector<std::string> &topics); -BHMsg MakeUnsub(const std::vector<std::string> &topics); +BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics); +BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics); BHMsg MakePub(const std::string &topic, const void *data, const size_t size); class MsgI { diff --git a/src/pubsub.cpp b/src/pubsub.cpp index ee2614a..686c7d5 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -17,14 +17,17 @@ */ #include "pubsub.h" #include <chrono> +#include "bh_util.h" namespace bhome_shm { using namespace std::chrono_literals; const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); const int kMaxWorker = 16; +using namespace bhome_msg; BusManager::BusManager(SharedMemory &shm): +shm_(shm), busq_(kBusQueueId, shm, 1000), run_(false) { @@ -42,15 +45,12 @@ // start auto Worker = [&](){ while (this->run_) { - std::this_thread::sleep_for(100ms); BusManager &self = *this; BHMsg msg; const int timeout_ms = 100; - if (!self.busq_.Recv(msg, timeout_ms)) { - continue; + if (self.busq_.Recv(msg, timeout_ms)) { + self.OnMsg(msg); } - // handle msg; - // type: subscribe(topic), publish(topic, data) } }; @@ -81,5 +81,67 @@ return false; } +void BusManager::OnMsg(const BHMsg &msg) +{ + auto OnSubChange = [&](auto &&update) { + DataSub sub; + if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) { + assert(sizeof(MQId) == msg.route(0).mq_id().size()); + MQId client; + memcpy(&client, msg.route(0).mq_id().data(), sizeof(client)); + + std::lock_guard<std::mutex> guard(mutex_); + auto &topics = sub.topics(); + for (auto &topic : topics) { + try { + update(topic, client); + } catch(...) { + //TODO log error + } + } + } + }; + + auto Sub1 = [this](const std::string &topic, const MQId &id) { + records_[topic].insert(id); + }; + + auto Unsub1 = [this](const std::string &topic, const MQId &id) { + auto pos = records_.find(topic); + if (pos != records_.end()) { + if (pos->second.erase(id) && pos->second.empty()) { + records_.erase(pos); + } + } + }; + + auto OnPublish = [&]() { + DataPub pub; + MsgI pubmsg; + if (!pub.ParseFromString(msg.body()) || !pubmsg.MakeRC(shm_, msg)) { + return; + } + DEFER1(pubmsg.Release(shm_)); + + std::lock_guard<std::mutex> guard(mutex_); + auto pos = records_.find(pub.topic()); + if (pos != records_.end() && !pos->second.empty()) { + auto &clients = pos->second; + for (auto &cli : clients) { + busq_.Send(cli, pubmsg, 100); + } + } else { + // printf("invalid topic: %s\n", pub.topic().c_str()); + } + }; + + switch (msg.type()) { + case kMsgTypeSubscribe: OnSubChange(Sub1); break; + case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break; + case kMsgTypePublish : OnPublish(); break; + default: break; + } +} + } // namespace bhome_shm diff --git a/src/pubsub.h b/src/pubsub.h index 0628216..c1f98af 100644 --- a/src/pubsub.h +++ b/src/pubsub.h @@ -23,18 +23,24 @@ #include <atomic> #include <mutex> #include <vector> +#include <unordered_map> +#include <set> namespace bhome_shm { // publish/subcribe manager. class BusManager { + SharedMemory &shm_; ShmMsgQueue busq_; std::atomic<bool> run_; std::vector<std::thread> workers_; std::mutex mutex_; + typedef std::set<MQId> Clients; + std::unordered_map<std::string, Clients> records_; bool StopNoLock(); + void OnMsg(const BHMsg &msg); public: BusManager(SharedMemory &shm); ~BusManager(); diff --git a/utest/utest.cpp b/utest/utest.cpp index 6e41116..a074f41 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -15,6 +15,8 @@ #include "bh_util.h" #include <sys/types.h> #include <sys/wait.h> +#include "pubsub.h" +#include "defs.h" using namespace std::chrono_literals; using namespace bhome_msg; @@ -332,11 +334,10 @@ } else { ++count; auto cur = Now(); - if (last_time.exchange(cur) != cur) { - std::cout << "time: " << Now(); + if (last_time.exchange(cur) < cur) { + std::cout << "time: " << cur; printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n", count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count()); - last_time = cur; } } @@ -382,6 +383,86 @@ // BOOST_CHECK_THROW(reply.Count(), int); } +BOOST_AUTO_TEST_CASE(PubSubTest) +{ + const std::string shm_name("ShmPubSub"); + ShmRemover auto_remove(shm_name); + SharedMemory shm(shm_name, 1024*1024*50); + auto Avail = [&]() { return shm.get_free_memory(); }; + auto init_avail = Avail(); + + BusManager bus(shm); + bus.Start(1); + std::this_thread::sleep_for(100ms); + + std::atomic<uint64_t> count(0); + std::atomic<ptime> last_time(Now() - seconds(1)); + std::atomic<uint64_t> last_count(0); + + const uint64_t nmsg = 1000 * 100; + + const int timeout = 1000; + auto Sub = [&](int id, const std::vector<std::string> &topics) { + ShmMsgQueue client(shm, 8); + client.Send(kBusQueueId, MakeSub(client.Id(), topics), timeout); + for (int i = 0; i < nmsg * topics.size(); ++i) { + BHMsg msg; + if (client.Recv(msg, 1000)) { + if (msg.type() != kMsgTypePublish) { + BOOST_CHECK(false); + } + DataPub pub; + if (!pub.ParseFromString(msg.body())) { + BOOST_CHECK(false); + } + ++count; + auto cur = Now(); + if (last_time.exchange(cur) < cur) { + std::cout << "time: " << cur; + printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", + count.load(), count - last_count.exchange(count), init_avail - Avail()); + } + // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); + + } else { + printf("sub %2d recv timeout\n", id); + } + + } + }; + auto Pub = [&](const std::string &topic) { + ShmMsgQueue provider(shm, 0); + for (int i = 0; i < nmsg; ++i) { + std::string data = topic + std::to_string(i); + bool r = provider.Send(kBusQueueId, MakePub(topic, data.data(), data.size()), timeout); + if (!r) { + printf("pub ret: %s\n", r ? "ok" : "fail"); + } + } + }; + ThreadManager threads; + typedef std::vector<std::string> Topics; + Topics topics; + topics.push_back("000-xxx"); + topics.push_back("111-football"); + topics.push_back("222-sport"); + // topics.push_back("333-sport"); + // topics.push_back("444-sport"); + Topics part; + for (int i = 0; i < topics.size(); ++i) { + part.push_back(topics[i]); + threads.Launch(Sub, i, part); + } + std::this_thread::sleep_for(100ms); + for (auto &topic: topics) { + threads.Launch(Pub, topic); + } + threads.Launch(Pub, "some_else"); + + threads.WaitAll(); + + bus.Stop(); +} inline int MyMin(int a, int b) { printf("MyMin\n"); -- Gitblit v1.8.0