From 83085f2ce99cca05d40a19482151873a55e6393a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 02 四月 2021 19:32:21 +0800
Subject: [PATCH] refactor center; add async request no cb.

---
 src/reqrep_center.cpp |   54 ++++++++++++++++++++++++++++++++++++------------------
 1 files changed, 36 insertions(+), 18 deletions(-)

diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
index 2356ebc..e52b0fd 100644
--- a/src/reqrep_center.cpp
+++ b/src/reqrep_center.cpp
@@ -99,63 +99,81 @@
 	std::unordered_map<Topic, WeakNode> topic_map_;
 	std::unordered_map<ProcId, Node> nodes_;
 };
+
+Synced<NodeCenter> &Center()
+{
+	static Synced<NodeCenter> s;
+	return s;
+}
+
 } // namespace
 
-bool ReqRepCenter::Start(const int nworker)
+BHCenter::MsgHandler MakeReqRepCenter()
 {
 	auto center_ptr = std::make_shared<Synced<NodeCenter>>();
-	auto onRecv = [center_ptr, this](BHMsg &msg) {
+	return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
 		auto &center = *center_ptr;
+		auto &shm = socket.shm();
 
 #ifndef NDEBUG
 		static std::atomic<time_t> last(0);
 		time_t now = 0;
 		time(&now);
 		if (last.exchange(now) < now) {
-			printf("bus queue size: %ld\n", socket_.Pending());
+			printf("bus queue size: %ld\n", socket.Pending());
 		}
 #endif
-		if (msg.route_size() == 0) {
-			return;
-		}
-		auto &src_mq = msg.route(0).mq_id();
+		auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
 
 		auto OnRegister = [&]() {
+			if (msg.route_size() != 1) { return; }
+
 			DataProcRegister reg;
 			if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
-				center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end());
+				center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
 			}
 		};
 
 		auto OnHeartbeat = [&]() {
+			if (msg.route_size() != 1) { return; }
+			auto &src_mq = msg.route(0).mq_id();
+
 			DataProcHeartbeat hb;
 			if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
-				center->Heartbeat(*hb.mutable_proc(), src_mq);
+				center->Heartbeat(*hb.mutable_proc(), SrcMQ());
 			}
 		};
 
 		auto OnQueryTopic = [&]() {
+			if (msg.route_size() != 1) { return; }
+
 			DataProcQueryTopic query;
 			NodeCenter::ProcAddr dest;
 			if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
 				MQId remote;
-				memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
+				memcpy(&remote, SrcMQ().data(), sizeof(MQId));
 				MsgI imsg;
-				if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
-				if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) {
-					imsg.Release(shm());
+				if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
+				if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) {
+					imsg.Release(shm);
 				}
 			}
 		};
 
 		switch (msg.type()) {
-		case kMsgTypeProcRegisterTopics: OnRegister(); break;
-		case kMsgTypeProcHeartbeat: OnHeartbeat(); break;
-		case kMsgTypeProcQueryTopic: OnQueryTopic(); break;
-		default: break;
+		case kMsgTypeProcRegisterTopics: OnRegister(); return true;
+		case kMsgTypeProcHeartbeat: OnHeartbeat(); return true;
+		case kMsgTypeProcQueryTopic: OnQueryTopic(); return true;
+		default: return false;
 		}
 	};
+}
+
+bool ReqRepCenter::Start(const int nworker)
+{
+	auto handler = MakeReqRepCenter();
+	printf("sizeof(rep/req handler) = %ld\n", sizeof(handler));
 
 	const int kMaxWorker = 16;
-	return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+	return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
 }
\ No newline at end of file

--
Gitblit v1.8.0