From 83085f2ce99cca05d40a19482151873a55e6393a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 02 四月 2021 19:32:21 +0800
Subject: [PATCH] refactor center; add async request no cb.

---
 src/pubsub_center.cpp |   37 +++++++++++++++++++------------------
 1 files changed, 19 insertions(+), 18 deletions(-)

diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp
index 3ba5382..b3af47d 100644
--- a/src/pubsub_center.cpp
+++ b/src/pubsub_center.cpp
@@ -77,25 +77,21 @@
 
 } // namespace
 
-bool PubSubCenter::Start(const int nworker)
+BHCenter::MsgHandler MakeBusCenter()
 {
 	auto bus_ptr = std::make_shared<Synced<BusCenter>>();
 
-	auto onRecv = [bus_ptr, this](MsgI &imsg) {
+	return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
 #ifndef NDEBUG
 		static std::atomic<time_t> last(0);
 		time_t now = 0;
 		time(&now);
 		if (last.exchange(now) < now) {
-			printf("bus queue size: %ld\n", socket_.Pending());
+			printf("bus queue size: %ld\n", socket.Pending());
 		}
 #endif
 		auto &bus = *bus_ptr;
-
-		BHMsg msg;
-		if (!imsg.Unpack(msg)) {
-			return;
-		}
+		auto &shm = socket.shm();
 
 		auto OnSubChange = [&](auto &&update) {
 			DataSub sub;
@@ -106,7 +102,6 @@
 				update(client, sub.topics());
 			}
 		};
-
 		auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); };
 		auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
 
@@ -123,24 +118,30 @@
 			};
 
 			if (imsg.IsCounted()) {
-				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); });
+				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); });
 			} else {
 				MsgI pubmsg;
-				if (!pubmsg.MakeRC(shm(), msg)) { return; }
-				DEFER1(pubmsg.Release(shm()));
+				if (!pubmsg.MakeRC(shm, msg)) { return; }
+				DEFER1(pubmsg.Release(shm));
 
-				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); });
+				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); });
 			}
 		};
 
 		switch (msg.type()) {
-		case kMsgTypeSubscribe: OnSubChange(Sub); break;
-		case kMsgTypeUnsubscribe: OnSubChange(Unsub); break;
-		case kMsgTypePublish: OnPublish(); break;
-		default: break;
+		case kMsgTypeSubscribe: OnSubChange(Sub); return true;
+		case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true;
+		case kMsgTypePublish: OnPublish(); return true;
+		default: return false;
 		}
 	};
+}
+
+bool PubSubCenter::Start(const int nworker)
+{
+	auto handler = MakeBusCenter();
+	printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler));
 
 	const int kMaxWorker = 16;
-	return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+	return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
 }
\ No newline at end of file

--
Gitblit v1.8.0