From 3c2b6739208d961cf8b86460d7f05516d044960c Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 31 三月 2021 19:13:42 +0800
Subject: [PATCH] add async recv suport; sync by waiting for async.

---
 src/pubsub.cpp |  127 +++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 124 insertions(+), 3 deletions(-)

diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index b592113..eff54ab 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -16,9 +16,130 @@
  * =====================================================================================
  */
 #include "pubsub.h"
+#include "bh_util.h"
+#include "defs.h"
 
-namespace bhome_shm {
+namespace bhome_shm
+{
 
-	
+using namespace std::chrono_literals;
+const int kMaxWorker = 16;
+using namespace bhome_msg;
+
+BusManager::BusManager(SharedMemory &shm) :
+    shm_(shm), socket_(ShmSocket::eSockBus, shm) {}
+BusManager::BusManager() :
+    BusManager(BHomeShm()) {}
+
+bool BusManager::Start(const int nworker)
+{
+	auto onRecv = [&](MsgI &imsg) {
+#ifndef NDEBUG
+		static std::atomic<time_t> last(0);
+		time_t now = 0;
+		time(&now);
+		if (last.exchange(now) < now) {
+			printf("bus queue size: %ld\n", socket_.Pending());
+		}
+#endif
+
+		BHMsg msg;
+		if (!imsg.Unpack(msg)) {
+			return;
+		}
+
+		auto OnSubChange = [&](auto &&update) {
+			DataSub sub;
+			if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
+				assert(sizeof(MQId) == msg.route(0).mq_id().size());
+				MQId client;
+				memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
+
+				std::lock_guard<std::mutex> guard(mutex_);
+				auto &topics = sub.topics();
+				for (auto &topic : topics) {
+					try {
+						update(topic, client);
+					} catch (...) {
+						//TODO log error
+					}
+				}
+			}
+		};
+
+		auto Sub1 = [this](const std::string &topic, const MQId &id) {
+			records_[topic].insert(id);
+		};
+
+		auto Unsub1 = [this](const std::string &topic, const MQId &id) {
+			auto pos = records_.find(topic);
+			if (pos != records_.end()) {
+				if (pos->second.erase(id) && pos->second.empty()) {
+					records_.erase(pos);
+				}
+			}
+		};
+
+		auto OnPublish = [&]() {
+			DataPub pub;
+			if (!pub.ParseFromString(msg.body())) {
+				return;
+			}
+			auto FindClients = [&](const std::string &topic) {
+				Clients dests;
+				std::lock_guard<std::mutex> guard(mutex_);
+				auto Find1 = [&](const std::string &t) {
+					auto pos = records_.find(topic);
+					if (pos != records_.end() && !pos->second.empty()) {
+						auto &clients = pos->second;
+						for (auto &cli : clients) {
+							dests.insert(cli);
+						}
+					}
+				};
+				Find1(topic);
+
+				//TODO check and adjust topic on client side sub/pub.
+				size_t pos = 0;
+				while (true) {
+					pos = topic.find(kTopicSep, pos);
+					if (pos == topic.npos || ++pos == topic.size()) {
+						// Find1(std::string()); // sub all.
+						break;
+					} else {
+						Find1(topic.substr(0, pos));
+					}
+				}
+				return dests;
+			};
+
+			auto Dispatch = [&](auto &&send1) {
+				const Clients &clients(FindClients(pub.topic()));
+				for (auto &cli : clients) {
+					send1(cli);
+				}
+			};
+
+			if (imsg.IsCounted()) {
+				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, imsg, 100); });
+			} else {
+				MsgI pubmsg;
+				if (!pubmsg.MakeRC(shm_, msg)) { return; }
+				DEFER1(pubmsg.Release(shm_));
+
+				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, pubmsg, 100); });
+			}
+		};
+
+		switch (msg.type()) {
+		case kMsgTypeSubscribe: OnSubChange(Sub1); break;
+		case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
+		case kMsgTypePublish: OnPublish(); break;
+		default: break;
+		}
+	};
+
+	return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+}
+
 } // namespace bhome_shm
-

--
Gitblit v1.8.0