From c338820e4db43ad32c20ff429a038b06bcb980f8 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 08 四月 2021 18:13:25 +0800
Subject: [PATCH] BIG change, join center,bus; now msg is head+body.

---
 src/pubsub.cpp |  205 ++++++++++++++-------------------------------------
 1 files changed, 56 insertions(+), 149 deletions(-)

diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index d5c7dd2..471c63c 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -18,168 +18,75 @@
 #include "pubsub.h"
 #include "bh_util.h"
 #include "defs.h"
-#include <chrono>
-
-namespace bhome_shm
-{
 
 using namespace std::chrono_literals;
-const int kMaxWorker = 16;
 using namespace bhome_msg;
 
-BusManager::BusManager(SharedMemory &shm) :
-    shm_(shm),
-    busq_(kBHBusQueueId, shm, 16),
-    run_(false)
+bool SocketPublish::Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms)
 {
-}
-
-BusManager::~BusManager()
-{
-	Stop();
-}
-
-bool BusManager::Start(const int nworker)
-{
-	std::lock_guard<std::mutex> guard(mutex_);
-	StopNoLock();
-	// start
-	auto Worker = [&]() {
-		while (this->run_) {
-			BusManager &self = *this;
-			MsgI msg;
-			const int timeout_ms = 100;
-			if (self.busq_.Recv(msg, timeout_ms)) {
-				self.OnMsg(msg);
-			}
+	try {
+		MsgPublish pub;
+		pub.set_topic(topic);
+		pub.set_data(data, size);
+		BHMsgHead head(InitMsgHead(GetType(pub), proc_id));
+		MsgI imsg;
+		if (imsg.MakeRC(shm(), head, pub)) {
+			DEFER1(imsg.Release(shm()));
+			return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
 		}
-	};
-
-	run_.store(true);
-	const int n = std::min(nworker, kMaxWorker);
-	for (int i = 0; i < n; ++i) {
-		workers_.emplace_back(Worker);
-	}
-	return true;
-}
-
-bool BusManager::Stop()
-{
-	std::lock_guard<std::mutex> guard(mutex_);
-	return StopNoLock();
-}
-
-bool BusManager::StopNoLock()
-{
-	if (run_.exchange(false)) {
-		for (auto &w : workers_) {
-			if (w.joinable()) {
-				w.join();
-			}
-		}
-		return true;
+	} catch (...) {
 	}
 	return false;
 }
-
-void BusManager::OnMsg(MsgI &imsg)
+namespace
 {
-	DEFER1(imsg.Release(shm_));
+inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
 
-	BHMsg msg;
-	if (!imsg.Unpack(msg)) {
-		return;
-	}
-
-	auto OnSubChange = [&](auto &&update) {
-		DataSub sub;
-		if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
-			assert(sizeof(MQId) == msg.route(0).mq_id().size());
-			MQId client;
-			memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
-
-			std::lock_guard<std::mutex> guard(mutex_);
-			auto &topics = sub.topics();
-			for (auto &topic : topics) {
-				try {
-					update(topic, client);
-				} catch (...) {
-					//TODO log error
-				}
-			}
+} // namespace
+bool SocketSubscribe::Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms)
+{
+	try {
+		MsgSubscribe sub;
+		for (auto &topic : topics) {
+			sub.add_topics(topic);
 		}
-	};
+		BHMsgHead head(InitMsgHead(GetType(sub), proc_id));
+		AddRoute(head, mq().Id());
 
-	auto Sub1 = [this](const std::string &topic, const MQId &id) {
-		records_[topic].insert(id);
-	};
-
-	auto Unsub1 = [this](const std::string &topic, const MQId &id) {
-		auto pos = records_.find(topic);
-		if (pos != records_.end()) {
-			if (pos->second.erase(id) && pos->second.empty()) {
-				records_.erase(pos);
-			}
-		}
-	};
-
-	auto OnPublish = [&]() {
-		DataPub pub;
-		if (!pub.ParseFromString(msg.body())) {
-			return;
-		}
-		auto FindClients = [&](const std::string &topic) {
-			Clients dests;
-			std::lock_guard<std::mutex> guard(mutex_);
-			auto Find1 = [&](const std::string &t) {
-				auto pos = records_.find(topic);
-				if (pos != records_.end() && !pos->second.empty()) {
-					auto &clients = pos->second;
-					for (auto &cli : clients) {
-						dests.insert(cli);
-					}
-				}
-			};
-			Find1(topic);
-
-			//TODO check and adjust topic on client side sub/pub.
-			size_t pos = 0;
-			while (true) {
-				pos = topic.find(kTopicSep, pos);
-				if (pos == topic.npos || ++pos == topic.size()) {
-					// Find1(std::string()); // sub all.
-					break;
-				} else {
-					Find1(topic.substr(0, pos));
-				}
-			}
-			return dests;
-		};
-
-		auto Dispatch = [&](auto &&send1) {
-			const Clients &clients(FindClients(pub.topic()));
-			for (auto &cli : clients) {
-				send1(cli);
-			}
-		};
-
-		if (imsg.IsCounted()) {
-			Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); });
-		} else {
-			MsgI pubmsg;
-			if (!pubmsg.MakeRC(shm_, msg)) { return; }
-			DEFER1(pubmsg.Release(shm_));
-
-			Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); });
-		}
-	};
-
-	switch (msg.type()) {
-	case kMsgTypeSubscribe: OnSubChange(Sub1); break;
-	case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
-	case kMsgTypePublish: OnPublish(); break;
-	default: break;
+		return Send(&BHTopicBusAddress(), head, sub, timeout_ms);
+	} catch (...) {
+		return false;
 	}
 }
 
-} // namespace bhome_shm
+bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
+{
+	auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
+		if (head.type() == kMsgTypePublish) {
+			MsgPublish pub;
+			if (imsg.ParseBody(pub)) {
+				tdcb(head.proc_id(), pub.topic(), pub.data());
+			}
+		} else {
+			// ignored, or dropped
+		}
+	};
+
+	return tdcb && Start(AsyncRecvProc, nworker);
+}
+
+bool SocketSubscribe::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms)
+{
+	MsgI msg;
+	BHMsgHead head;
+	if (SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
+		MsgPublish pub;
+		if (msg.ParseBody(pub)) {
+			head.mutable_proc_id()->swap(proc_id);
+			pub.mutable_topic()->swap(topic);
+			pub.mutable_data()->swap(data);
+			return true;
+		}
+	}
+	return false;
+}
\ No newline at end of file

--
Gitblit v1.8.0