From 0bc72d004b08b6cac005931787f43c68dace7685 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 02 四月 2021 16:25:39 +0800
Subject: [PATCH] refactor pub/sub center.
---
src/pubsub_center.h | 5 -
src/pubsub_center.cpp | 126 +++++++++++++++++++++++------------------
2 files changed, 70 insertions(+), 61 deletions(-)
diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp
index afd07bf..3ba5382 100644
--- a/src/pubsub_center.cpp
+++ b/src/pubsub_center.cpp
@@ -18,10 +18,70 @@
#include "pubsub_center.h"
#include "bh_util.h"
using namespace bhome_shm;
+namespace
+{
+class BusCenter
+{
+ typedef std::set<MQId> Clients;
+ std::unordered_map<Topic, Clients> records_;
+ // todo cache data if send fail.
+
+public:
+ template <class Iter>
+ void SubScribe(const MQId &client, Iter topic_begin, Iter topic_end)
+ {
+ for (auto it = topic_begin; it != topic_end; ++it) {
+ records_[*it].insert(client);
+ }
+ }
+ template <class Iter>
+ void UnsubScribe(const MQId &client, Iter topic_begin, Iter topic_end)
+ {
+ for (auto it = topic_begin; it != topic_end; ++it) {
+ auto pos = records_.find(*it);
+ if (pos != records_.end()) {
+ if (pos->second.erase(client) && pos->second.empty()) {
+ records_.erase(pos);
+ }
+ }
+ }
+ };
+ Clients FindClients(const std::string &topic)
+ {
+ Clients dests;
+ auto Find1 = [&](const std::string &t) {
+ auto pos = records_.find(topic);
+ if (pos != records_.end() && !pos->second.empty()) {
+ auto &clients = pos->second;
+ for (auto &cli : clients) {
+ dests.insert(cli);
+ }
+ }
+ };
+ Find1(topic);
+
+ //TODO check and adjust topic on client side sub/pub.
+ size_t pos = 0;
+ while (true) {
+ pos = topic.find(kTopicSep, pos);
+ if (pos == topic.npos || ++pos == topic.size()) {
+ // Find1(std::string()); // sub all.
+ break;
+ } else {
+ Find1(topic.substr(0, pos));
+ }
+ }
+ return dests;
+ }
+};
+
+} // namespace
bool PubSubCenter::Start(const int nworker)
{
- auto onRecv = [&](MsgI &imsg) {
+ auto bus_ptr = std::make_shared<Synced<BusCenter>>();
+
+ auto onRecv = [bus_ptr, this](MsgI &imsg) {
#ifndef NDEBUG
static std::atomic<time_t> last(0);
time_t now = 0;
@@ -30,6 +90,7 @@
printf("bus queue size: %ld\n", socket_.Pending());
}
#endif
+ auto &bus = *bus_ptr;
BHMsg msg;
if (!imsg.Unpack(msg)) {
@@ -42,86 +103,39 @@
assert(sizeof(MQId) == msg.route(0).mq_id().size());
MQId client;
memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
-
- std::lock_guard<std::mutex> guard(mutex_);
- auto &topics = sub.topics();
- for (auto &topic : topics) {
- try {
- update(topic, client);
- } catch (...) {
- //TODO log error
- }
- }
+ update(client, sub.topics());
}
};
- auto Sub1 = [this](const std::string &topic, const MQId &id) {
- records_[topic].insert(id);
- };
-
- auto Unsub1 = [this](const std::string &topic, const MQId &id) {
- auto pos = records_.find(topic);
- if (pos != records_.end()) {
- if (pos->second.erase(id) && pos->second.empty()) {
- records_.erase(pos);
- }
- }
- };
+ auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); };
+ auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
auto OnPublish = [&]() {
DataPub pub;
if (!pub.ParseFromString(msg.body())) {
return;
}
- auto FindClients = [&](const std::string &topic) {
- Clients dests;
- std::lock_guard<std::mutex> guard(mutex_);
- auto Find1 = [&](const std::string &t) {
- auto pos = records_.find(topic);
- if (pos != records_.end() && !pos->second.empty()) {
- auto &clients = pos->second;
- for (auto &cli : clients) {
- dests.insert(cli);
- }
- }
- };
- Find1(topic);
-
- //TODO check and adjust topic on client side sub/pub.
- size_t pos = 0;
- while (true) {
- pos = topic.find(kTopicSep, pos);
- if (pos == topic.npos || ++pos == topic.size()) {
- // Find1(std::string()); // sub all.
- break;
- } else {
- Find1(topic.substr(0, pos));
- }
- }
- return dests;
- };
-
auto Dispatch = [&](auto &&send1) {
- const Clients &clients(FindClients(pub.topic()));
+ const auto &clients(bus->FindClients(pub.topic()));
for (auto &cli : clients) {
send1(cli);
}
};
if (imsg.IsCounted()) {
- Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 100); });
+ Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); });
} else {
MsgI pubmsg;
if (!pubmsg.MakeRC(shm(), msg)) { return; }
DEFER1(pubmsg.Release(shm()));
- Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 100); });
+ Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); });
}
};
switch (msg.type()) {
- case kMsgTypeSubscribe: OnSubChange(Sub1); break;
- case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
+ case kMsgTypeSubscribe: OnSubChange(Sub); break;
+ case kMsgTypeUnsubscribe: OnSubChange(Unsub); break;
case kMsgTypePublish: OnPublish(); break;
default: break;
}
diff --git a/src/pubsub_center.h b/src/pubsub_center.h
index aa9db68..af3a2f4 100644
--- a/src/pubsub_center.h
+++ b/src/pubsub_center.h
@@ -37,11 +37,6 @@
SocketBus socket_;
ShmSocket::Shm &shm() { return socket_.shm(); }
- std::mutex mutex_;
- typedef std::set<MQId> Clients;
- std::unordered_map<Topic, Clients> records_;
- bool Find1(const Topic &topic);
-
public:
PubSubCenter(ShmSocket::Shm &shm) :
socket_(shm) {}
--
Gitblit v1.8.0