From 0bc72d004b08b6cac005931787f43c68dace7685 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 02 四月 2021 16:25:39 +0800
Subject: [PATCH] refactor pub/sub center.

---
 src/pubsub_center.cpp |  126 +++++++++++++++++++++++------------------
 1 files changed, 70 insertions(+), 56 deletions(-)

diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp
index afd07bf..3ba5382 100644
--- a/src/pubsub_center.cpp
+++ b/src/pubsub_center.cpp
@@ -18,10 +18,70 @@
 #include "pubsub_center.h"
 #include "bh_util.h"
 using namespace bhome_shm;
+namespace
+{
+class BusCenter
+{
+	typedef std::set<MQId> Clients;
+	std::unordered_map<Topic, Clients> records_;
+	// todo cache data if send fail.
+
+public:
+	template <class Iter>
+	void SubScribe(const MQId &client, Iter topic_begin, Iter topic_end)
+	{
+		for (auto it = topic_begin; it != topic_end; ++it) {
+			records_[*it].insert(client);
+		}
+	}
+	template <class Iter>
+	void UnsubScribe(const MQId &client, Iter topic_begin, Iter topic_end)
+	{
+		for (auto it = topic_begin; it != topic_end; ++it) {
+			auto pos = records_.find(*it);
+			if (pos != records_.end()) {
+				if (pos->second.erase(client) && pos->second.empty()) {
+					records_.erase(pos);
+				}
+			}
+		}
+	};
+	Clients FindClients(const std::string &topic)
+	{
+		Clients dests;
+		auto Find1 = [&](const std::string &t) {
+			auto pos = records_.find(topic);
+			if (pos != records_.end() && !pos->second.empty()) {
+				auto &clients = pos->second;
+				for (auto &cli : clients) {
+					dests.insert(cli);
+				}
+			}
+		};
+		Find1(topic);
+
+		//TODO check and adjust topic on client side sub/pub.
+		size_t pos = 0;
+		while (true) {
+			pos = topic.find(kTopicSep, pos);
+			if (pos == topic.npos || ++pos == topic.size()) {
+				// Find1(std::string()); // sub all.
+				break;
+			} else {
+				Find1(topic.substr(0, pos));
+			}
+		}
+		return dests;
+	}
+};
+
+} // namespace
 
 bool PubSubCenter::Start(const int nworker)
 {
-	auto onRecv = [&](MsgI &imsg) {
+	auto bus_ptr = std::make_shared<Synced<BusCenter>>();
+
+	auto onRecv = [bus_ptr, this](MsgI &imsg) {
 #ifndef NDEBUG
 		static std::atomic<time_t> last(0);
 		time_t now = 0;
@@ -30,6 +90,7 @@
 			printf("bus queue size: %ld\n", socket_.Pending());
 		}
 #endif
+		auto &bus = *bus_ptr;
 
 		BHMsg msg;
 		if (!imsg.Unpack(msg)) {
@@ -42,86 +103,39 @@
 				assert(sizeof(MQId) == msg.route(0).mq_id().size());
 				MQId client;
 				memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
-
-				std::lock_guard<std::mutex> guard(mutex_);
-				auto &topics = sub.topics();
-				for (auto &topic : topics) {
-					try {
-						update(topic, client);
-					} catch (...) {
-						//TODO log error
-					}
-				}
+				update(client, sub.topics());
 			}
 		};
 
-		auto Sub1 = [this](const std::string &topic, const MQId &id) {
-			records_[topic].insert(id);
-		};
-
-		auto Unsub1 = [this](const std::string &topic, const MQId &id) {
-			auto pos = records_.find(topic);
-			if (pos != records_.end()) {
-				if (pos->second.erase(id) && pos->second.empty()) {
-					records_.erase(pos);
-				}
-			}
-		};
+		auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); };
+		auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
 
 		auto OnPublish = [&]() {
 			DataPub pub;
 			if (!pub.ParseFromString(msg.body())) {
 				return;
 			}
-			auto FindClients = [&](const std::string &topic) {
-				Clients dests;
-				std::lock_guard<std::mutex> guard(mutex_);
-				auto Find1 = [&](const std::string &t) {
-					auto pos = records_.find(topic);
-					if (pos != records_.end() && !pos->second.empty()) {
-						auto &clients = pos->second;
-						for (auto &cli : clients) {
-							dests.insert(cli);
-						}
-					}
-				};
-				Find1(topic);
-
-				//TODO check and adjust topic on client side sub/pub.
-				size_t pos = 0;
-				while (true) {
-					pos = topic.find(kTopicSep, pos);
-					if (pos == topic.npos || ++pos == topic.size()) {
-						// Find1(std::string()); // sub all.
-						break;
-					} else {
-						Find1(topic.substr(0, pos));
-					}
-				}
-				return dests;
-			};
-
 			auto Dispatch = [&](auto &&send1) {
-				const Clients &clients(FindClients(pub.topic()));
+				const auto &clients(bus->FindClients(pub.topic()));
 				for (auto &cli : clients) {
 					send1(cli);
 				}
 			};
 
 			if (imsg.IsCounted()) {
-				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 100); });
+				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); });
 			} else {
 				MsgI pubmsg;
 				if (!pubmsg.MakeRC(shm(), msg)) { return; }
 				DEFER1(pubmsg.Release(shm()));
 
-				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 100); });
+				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); });
 			}
 		};
 
 		switch (msg.type()) {
-		case kMsgTypeSubscribe: OnSubChange(Sub1); break;
-		case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
+		case kMsgTypeSubscribe: OnSubChange(Sub); break;
+		case kMsgTypeUnsubscribe: OnSubChange(Unsub); break;
 		case kMsgTypePublish: OnPublish(); break;
 		default: break;
 		}

--
Gitblit v1.8.0