From b55ffe89f4b237be5f79232cfddfe22bfdb87c64 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 01 四月 2021 13:23:48 +0800
Subject: [PATCH] make req/rep,sub/pub sockets sub class;

---
 src/pubsub.cpp |  153 ++++++++++++++-------------------------------------
 1 files changed, 42 insertions(+), 111 deletions(-)

diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index eff54ab..cfc77ab 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -19,127 +19,58 @@
 #include "bh_util.h"
 #include "defs.h"
 
-namespace bhome_shm
-{
-
 using namespace std::chrono_literals;
-const int kMaxWorker = 16;
 using namespace bhome_msg;
 
-BusManager::BusManager(SharedMemory &shm) :
-    shm_(shm), socket_(ShmSocket::eSockBus, shm) {}
-BusManager::BusManager() :
-    BusManager(BHomeShm()) {}
-
-bool BusManager::Start(const int nworker)
+bool SocketPublish::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
 {
-	auto onRecv = [&](MsgI &imsg) {
-#ifndef NDEBUG
-		static std::atomic<time_t> last(0);
-		time_t now = 0;
-		time(&now);
-		if (last.exchange(now) < now) {
-			printf("bus queue size: %ld\n", socket_.Pending());
+	try {
+		MsgI imsg;
+		if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) {
+			return false;
 		}
-#endif
+		DEFER1(imsg.Release(shm()));
+		return ShmMsgQueue::Send(shm(), kBHBusQueueId, imsg, timeout_ms);
+	} catch (...) {
+		return false;
+	}
+}
 
-		BHMsg msg;
-		if (!imsg.Unpack(msg)) {
-			return;
-		}
+bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
+{
+	try {
+		return mq().Send(kBHBusQueueId, MakeSub(mq().Id(), topics), timeout_ms);
+	} catch (...) {
+		return false;
+	}
+}
 
-		auto OnSubChange = [&](auto &&update) {
-			DataSub sub;
-			if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
-				assert(sizeof(MQId) == msg.route(0).mq_id().size());
-				MQId client;
-				memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
-
-				std::lock_guard<std::mutex> guard(mutex_);
-				auto &topics = sub.topics();
-				for (auto &topic : topics) {
-					try {
-						update(topic, client);
-					} catch (...) {
-						//TODO log error
-					}
-				}
+bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
+{
+	auto AsyncRecvProc = [this, tdcb](BHMsg &msg) {
+		if (msg.type() == kMsgTypePublish) {
+			DataPub d;
+			if (d.ParseFromString(msg.body())) {
+				tdcb(d.topic(), d.data());
 			}
-		};
-
-		auto Sub1 = [this](const std::string &topic, const MQId &id) {
-			records_[topic].insert(id);
-		};
-
-		auto Unsub1 = [this](const std::string &topic, const MQId &id) {
-			auto pos = records_.find(topic);
-			if (pos != records_.end()) {
-				if (pos->second.erase(id) && pos->second.empty()) {
-					records_.erase(pos);
-				}
-			}
-		};
-
-		auto OnPublish = [&]() {
-			DataPub pub;
-			if (!pub.ParseFromString(msg.body())) {
-				return;
-			}
-			auto FindClients = [&](const std::string &topic) {
-				Clients dests;
-				std::lock_guard<std::mutex> guard(mutex_);
-				auto Find1 = [&](const std::string &t) {
-					auto pos = records_.find(topic);
-					if (pos != records_.end() && !pos->second.empty()) {
-						auto &clients = pos->second;
-						for (auto &cli : clients) {
-							dests.insert(cli);
-						}
-					}
-				};
-				Find1(topic);
-
-				//TODO check and adjust topic on client side sub/pub.
-				size_t pos = 0;
-				while (true) {
-					pos = topic.find(kTopicSep, pos);
-					if (pos == topic.npos || ++pos == topic.size()) {
-						// Find1(std::string()); // sub all.
-						break;
-					} else {
-						Find1(topic.substr(0, pos));
-					}
-				}
-				return dests;
-			};
-
-			auto Dispatch = [&](auto &&send1) {
-				const Clients &clients(FindClients(pub.topic()));
-				for (auto &cli : clients) {
-					send1(cli);
-				}
-			};
-
-			if (imsg.IsCounted()) {
-				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, imsg, 100); });
-			} else {
-				MsgI pubmsg;
-				if (!pubmsg.MakeRC(shm_, msg)) { return; }
-				DEFER1(pubmsg.Release(shm_));
-
-				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, pubmsg, 100); });
-			}
-		};
-
-		switch (msg.type()) {
-		case kMsgTypeSubscribe: OnSubChange(Sub1); break;
-		case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
-		case kMsgTypePublish: OnPublish(); break;
-		default: break;
+		} else {
+			// ignored, or dropped
 		}
 	};
 
-	return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+	return tdcb && Start(AsyncRecvProc, nworker);
 }
 
-} // namespace bhome_shm
+bool SocketSubscribe::RecvSub(std::string &topic, std::string &data, const int timeout_ms)
+{
+	BHMsg msg;
+	if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
+		DataPub d;
+		if (d.ParseFromString(msg.body())) {
+			d.mutable_topic()->swap(topic);
+			d.mutable_data()->swap(data);
+			return true;
+		}
+	}
+	return false;
+}
\ No newline at end of file

--
Gitblit v1.8.0