From 4deeafbd502dc3c57dab8ad6ca601a38a9e7f074 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 06 四月 2021 19:10:49 +0800
Subject: [PATCH] add uni center.

---
 src/reqrep_center.cpp |  178 ++++++++++++++++++++++++++++++++++++++---------------------
 1 files changed, 115 insertions(+), 63 deletions(-)

diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
index 5f1873e..ce35d1c 100644
--- a/src/reqrep_center.cpp
+++ b/src/reqrep_center.cpp
@@ -17,105 +17,157 @@
  */
 #include "reqrep_center.h"
 #include "bh_util.h"
-using namespace bhome_shm;
+#include "msg.h"
+#include <chrono>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
 
-struct A {
-	void F(int){};
-};
+using namespace bhome_shm;
 
 namespace
 {
-inline uint64_t Now()
+auto Now = []() { time_t t; return time(&t); };
+
+class NodeCenter
 {
-	time_t t;
-	return time(&t);
-}
+public:
+	typedef std::string ProcAddr;
+	typedef bhome::msg::ProcInfo ProcInfo;
+
+	template <class Iter>
+	bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end)
+	{
+		try {
+			Node node(new NodeInfo);
+			node->addr_ = src_mq;
+			node->proc_.Swap(&info);
+			node->state_.timestamp_ = Now();
+			nodes_[node->proc_.id()] = node;
+			for (auto it = topics_begin; it != topics_end; ++it) {
+				topic_map_[*it] = node;
+			}
+			return true;
+		} catch (...) {
+			return false;
+		}
+	}
+	void Heartbeat(ProcInfo &info, const ProcAddr &src_mq)
+	{
+		auto pos = nodes_.find(info.name());
+		if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same.
+			NodeInfo &ni = *pos->second;
+			ni.state_.timestamp_ = Now();
+			if (!info.public_info().empty()) {
+				ni.proc_.set_public_info(info.public_info());
+			}
+			if (!info.private_info().empty()) {
+				ni.proc_.set_private_info(info.private_info());
+			}
+		}
+	}
+	bool QueryTopic(const Topic &topic, ProcAddr &addr)
+	{
+		auto pos = topic_map_.find(topic);
+		if (pos != topic_map_.end()) {
+			Node node(pos->second.lock());
+			if (node) {
+				addr = node->addr_;
+				return true;
+			} else { // dead, remove record.
+				topic_map_.erase(pos);
+				return false;
+			}
+		} else {
+			return false;
+		}
+	}
+
+private:
+	struct ProcState {
+		time_t timestamp_ = 0;
+		uint32_t flag_ = 0; // reserved
+	};
+	typedef std::string ProcId;
+	struct NodeInfo {
+		ProcState state_; // state
+		ProcAddr addr_;   // registered_mqid.
+		ProcInfo proc_;   //
+	};
+	typedef std::shared_ptr<NodeInfo> Node;
+	typedef std::weak_ptr<NodeInfo> WeakNode;
+	std::unordered_map<Topic, WeakNode> topic_map_;
+	std::unordered_map<ProcId, Node> nodes_;
+};
 
 } // namespace
-bool ReqRepCenter::Start(const int nworker)
+
+BHCenter::MsgHandler MakeReqRepCenter()
 {
-	auto onRecv = [&](BHMsg &msg) {
+	auto center_ptr = std::make_shared<Synced<NodeCenter>>();
+	return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
+		auto &center = *center_ptr;
+		auto &shm = socket.shm();
+
 #ifndef NDEBUG
 		static std::atomic<time_t> last(0);
 		time_t now = 0;
 		time(&now);
 		if (last.exchange(now) < now) {
-			printf("bus queue size: %ld\n", socket_.Pending());
+			printf("center queue size: %ld\n", socket.Pending());
 		}
 #endif
-		if (msg.route_size() == 0) {
-			return;
-		}
-		auto &src_mq = msg.route(0).mq_id();
+		auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
 
 		auto OnRegister = [&]() {
-			DataProcRegister reg;
-			if (!reg.ParseFromString(msg.body())) {
-				return;
-			}
-			ProcInfo pi;
-			pi.server_mqid_ = src_mq;
-			pi.proc_id_ = reg.proc().name();
-			pi.ext_info_ = reg.proc().info();
-			pi.timestamp_ = Now();
+			if (msg.route_size() != 1) { return; }
 
-			std::lock_guard<std::mutex> lock(mutex_);
-			for (auto &t : reg.topics()) {
-				topic_mq_[t] = pi.server_mqid_;
+			MsgRegister reg;
+			if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
+				center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
 			}
-			procs_[pi.proc_id_] = pi;
 		};
 
 		auto OnHeartbeat = [&]() {
-			DataProcHeartbeat hb;
-			if (!hb.ParseFromString(msg.body())) {
-				return;
-			}
+			if (msg.route_size() != 1) { return; }
+			auto &src_mq = msg.route(0).mq_id();
 
-			std::lock_guard<std::mutex> lock(mutex_);
-			auto pos = procs_.find(hb.proc().name());
-			if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same.
-				pos->second.timestamp_ = Now();
-				pos->second.ext_info_ = hb.proc().info();
+			MsgHeartbeat hb;
+			if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
+				center->Heartbeat(*hb.mutable_proc(), SrcMQ());
 			}
 		};
 
 		auto OnQueryTopic = [&]() {
-			DataProcQueryTopic query;
-			if (!query.ParseFromString(msg.body())) {
-				return;
-			}
+			if (msg.route_size() != 1) { return; }
 
-			std::string dest;
-			auto FindDest = [&]() {
-				std::lock_guard<std::mutex> lock(mutex_);
-				auto pos = topic_mq_.find(query.topic());
-				if (pos != topic_mq_.end()) {
-					dest = pos->second;
-					return true;
-				} else {
-					return false;
-				}
-			};
-			if (FindDest()) {
+			MsgQueryTopic query;
+			NodeCenter::ProcAddr dest;
+			if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
 				MQId remote;
-				memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
+				memcpy(&remote, SrcMQ().data(), sizeof(MQId));
 				MsgI imsg;
-				if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
-				if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) {
-					imsg.Release(shm());
+				if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
+				if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) {
+					imsg.Release(shm);
 				}
 			}
 		};
 
 		switch (msg.type()) {
-		case kMsgTypeProcRegisterTopics: OnRegister(); break;
-		case kMsgTypeProcHeartbeat: OnHeartbeat(); break;
-		case kMsgTypeProcQueryTopic: OnQueryTopic(); break;
-		default: break;
+		case kMsgTypeRegister: OnRegister(); return true;
+		case kMsgTypeHeartbeat: OnHeartbeat(); return true;
+		case kMsgTypeQueryTopic: OnQueryTopic(); return true;
+		default: return false;
 		}
 	};
+}
+
+bool ReqRepCenter::Start(const int nworker)
+{
+	auto handler = MakeReqRepCenter();
+	printf("sizeof(rep/req handler) = %ld\n", sizeof(handler));
 
 	const int kMaxWorker = 16;
-	return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
-}
\ No newline at end of file
+	return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+}

--
Gitblit v1.8.0