From bb9a7e348892eb5c4fccb063380aa6fcd9612b71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 06 四月 2021 17:32:35 +0800
Subject: [PATCH] server resend failed; rename msgs; refactor.

---
 src/reqrep_center.cpp |   22 ++++++++--------------
 1 files changed, 8 insertions(+), 14 deletions(-)

diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
index e52b0fd..ce35d1c 100644
--- a/src/reqrep_center.cpp
+++ b/src/reqrep_center.cpp
@@ -100,12 +100,6 @@
 	std::unordered_map<ProcId, Node> nodes_;
 };
 
-Synced<NodeCenter> &Center()
-{
-	static Synced<NodeCenter> s;
-	return s;
-}
-
 } // namespace
 
 BHCenter::MsgHandler MakeReqRepCenter()
@@ -120,7 +114,7 @@
 		time_t now = 0;
 		time(&now);
 		if (last.exchange(now) < now) {
-			printf("bus queue size: %ld\n", socket.Pending());
+			printf("center queue size: %ld\n", socket.Pending());
 		}
 #endif
 		auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
@@ -128,7 +122,7 @@
 		auto OnRegister = [&]() {
 			if (msg.route_size() != 1) { return; }
 
-			DataProcRegister reg;
+			MsgRegister reg;
 			if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
 				center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
 			}
@@ -138,7 +132,7 @@
 			if (msg.route_size() != 1) { return; }
 			auto &src_mq = msg.route(0).mq_id();
 
-			DataProcHeartbeat hb;
+			MsgHeartbeat hb;
 			if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
 				center->Heartbeat(*hb.mutable_proc(), SrcMQ());
 			}
@@ -147,7 +141,7 @@
 		auto OnQueryTopic = [&]() {
 			if (msg.route_size() != 1) { return; }
 
-			DataProcQueryTopic query;
+			MsgQueryTopic query;
 			NodeCenter::ProcAddr dest;
 			if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
 				MQId remote;
@@ -161,9 +155,9 @@
 		};
 
 		switch (msg.type()) {
-		case kMsgTypeProcRegisterTopics: OnRegister(); return true;
-		case kMsgTypeProcHeartbeat: OnHeartbeat(); return true;
-		case kMsgTypeProcQueryTopic: OnQueryTopic(); return true;
+		case kMsgTypeRegister: OnRegister(); return true;
+		case kMsgTypeHeartbeat: OnHeartbeat(); return true;
+		case kMsgTypeQueryTopic: OnQueryTopic(); return true;
 		default: return false;
 		}
 	};
@@ -176,4 +170,4 @@
 
 	const int kMaxWorker = 16;
 	return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
-}
\ No newline at end of file
+}

--
Gitblit v1.8.0