From f286fba4fa0c861ee8a40fb64aaa2650f16110fe Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 12 四月 2021 09:32:14 +0800
Subject: [PATCH] vscode exclude build,debug dir.

---
 src/center.cpp |  472 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 471 insertions(+), 1 deletions(-)

diff --git a/src/center.cpp b/src/center.cpp
index 809b6d1..d2aad0a 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -16,14 +16,484 @@
  * =====================================================================================
  */
 #include "center.h"
+#include "bh_util.h"
 #include "defs.h"
 #include "shm.h"
+#include <chrono>
+#include <set>
+
+using namespace std::chrono;
+using namespace std::chrono_literals;
 
 using namespace bhome_shm;
+using namespace bhome_msg;
+using namespace bhome::msg;
+typedef BHCenter::MsgHandler Handler;
+
+namespace
+{
+typedef steady_clock::time_point TimePoint;
+inline TimePoint Now() { return steady_clock::now(); };
+
+//TODO check proc_id
+class NodeCenter
+{
+public:
+	typedef std::string ProcId;
+	typedef std::string Address;
+	typedef bhome::msg::ProcInfo ProcInfo;
+	typedef std::function<void(Address const &)> Cleaner;
+
+private:
+	enum {
+		kStateInvalid,
+		kStateNormal,
+		kStateOffline,
+		kStateKillme,
+	};
+
+	struct ProcState {
+		TimePoint timestamp_;
+		uint32_t flag_ = 0; // reserved
+		void UpdateState(TimePoint now)
+		{
+			const auto kOfflineTime = 60 * 10s;
+			const auto kKillTime = 60 * 20s;
+
+			auto diff = now - timestamp_;
+			if (diff < kOfflineTime) {
+				flag_ = kStateNormal;
+			} else if (diff < kKillTime) {
+				flag_ = kStateOffline;
+			} else {
+				flag_ = kStateKillme;
+			}
+		}
+	};
+	typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
+
+	struct NodeInfo {
+		ProcState state_;             // state
+		std::set<Address> addrs_;     // registered mqs
+		ProcInfo proc_;               //
+		AddressTopics services_;      // address: topics
+		AddressTopics subscriptions_; // address: topics
+	};
+	typedef std::shared_ptr<NodeInfo> Node;
+	typedef std::weak_ptr<NodeInfo> WeakNode;
+
+	struct TopicDest {
+		Address mq_;
+		WeakNode weak_node_;
+		bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
+	};
+	inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
+	inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
+
+public:
+	typedef std::set<TopicDest> Clients;
+
+	NodeCenter(const std::string &id, const Cleaner &cleaner) :
+	    id_(id), cleaner_(cleaner) {}
+	const std::string &id() const { return id_; } // no need to lock.
+
+	//TODO maybe just return serialized string.
+	MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
+	{
+		if (msg.proc().proc_id() != head.proc_id()) {
+			return MakeReply(eInvalidInput, "invalid proc id.");
+		}
+
+		try {
+			Node node(new NodeInfo);
+			node->addrs_.insert(SrcAddr(head));
+			for (auto &addr : msg.addrs()) {
+				node->addrs_.insert(addr.mq_id());
+			}
+			node->proc_.Swap(msg.mutable_proc());
+			node->state_.timestamp_ = Now();
+			node->state_.flag_ = kStateNormal;
+			nodes_[node->proc_.proc_id()] = node;
+			return MakeReply(eSuccess);
+		} catch (...) {
+			return MakeReply(eError, "register node error.");
+		}
+	}
+
+	template <class Reply, class Func>
+	Reply HandleMsg(const BHMsgHead &head, Func const &op)
+	{
+		try {
+			auto pos = nodes_.find(head.proc_id());
+			if (pos == nodes_.end()) {
+				return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
+			} else {
+				auto node = pos->second;
+				if (!MatchAddr(node->addrs_, SrcAddr(head))) {
+					return MakeReply<Reply>(eAddressNotMatch, "Node address error.");
+				} else if (!Valid(*node)) {
+					return MakeReply<Reply>(eNoRespond, "Node is not alive.");
+				} else {
+					return op(node);
+				}
+			}
+		} catch (...) {
+			//TODO error log
+			return MakeReply<Reply>(eError, "internal error.");
+		}
+	}
+	template <class Func>
+	inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op)
+	{
+		return HandleMsg<MsgCommonReply, Func>(head, op);
+	}
+
+	MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg)
+	{
+		return HandleMsg(
+		    head, [&](Node node) -> MsgCommonReply {
+			    auto &src = SrcAddr(head);
+			    auto &topics = msg.topics().topic_list();
+			    node->services_[src].insert(topics.begin(), topics.end());
+			    TopicDest dest = {src, node};
+			    for (auto &topic : topics) {
+				    service_map_[topic].insert(dest);
+			    }
+			    return MakeReply(eSuccess);
+		    });
+	}
+
+	MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg)
+	{
+		return HandleMsg(head, [&](Node node) {
+			NodeInfo &ni = *node;
+			ni.state_.timestamp_ = Now();
+
+			auto &info = msg.proc();
+			if (!info.public_info().empty()) {
+				ni.proc_.set_public_info(info.public_info());
+			}
+			if (!info.private_info().empty()) {
+				ni.proc_.set_private_info(info.private_info());
+			}
+			return MakeReply(eSuccess);
+		});
+	}
+
+	MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req)
+	{
+		typedef MsgQueryTopicReply Reply;
+
+		auto query = [&](Node self) -> MsgQueryTopicReply {
+			auto pos = service_map_.find(req.topic());
+			if (pos != service_map_.end() && !pos->second.empty()) {
+				// now just find first one.
+				const TopicDest &dest = *(pos->second.begin());
+				Node dest_node(dest.weak_node_.lock());
+				if (!dest_node) {
+					service_map_.erase(pos);
+					return MakeReply<Reply>(eOffline, "topic server offline.");
+				} else if (!Valid(*dest_node)) {
+					return MakeReply<Reply>(eNoRespond, "topic server not responding.");
+				} else {
+					MsgQueryTopicReply reply = MakeReply<Reply>(eSuccess);
+					reply.mutable_address()->set_mq_id(dest.mq_);
+					return reply;
+				}
+
+			} else {
+				return MakeReply<Reply>(eNotFound, "topic server not found.");
+			}
+		};
+
+		return HandleMsg<Reply>(head, query);
+	}
+
+	MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
+	{
+		return HandleMsg(head, [&](Node node) {
+			auto &src = SrcAddr(head);
+			auto &topics = msg.topics().topic_list();
+			node->subscriptions_[src].insert(topics.begin(), topics.end());
+			TopicDest dest = {src, node};
+			for (auto &topic : topics) {
+				subscribe_map_[topic].insert(dest);
+			}
+			return MakeReply(eSuccess);
+		});
+	}
+	MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
+	{
+		return HandleMsg(head, [&](Node node) {
+			auto &src = SrcAddr(head);
+			auto pos = node->subscriptions_.find(src);
+
+			auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) {
+				auto pos = subscribe_map_.find(topic);
+				if (pos != subscribe_map_.end() &&
+				    pos->second.erase(dest) != 0 &&
+				    pos->second.empty()) {
+					subscribe_map_.erase(pos);
+				}
+			};
+
+			if (pos != node->subscriptions_.end()) {
+				const TopicDest &dest = {src, node};
+				auto &topics = msg.topics().topic_list();
+				// clear node sub records;
+				for (auto &topic : topics) {
+					pos->second.erase(topic);
+					RemoveSubTopicDestRecord(topic, dest);
+				}
+				if (pos->second.empty()) {
+					node->subscriptions_.erase(pos);
+				}
+			}
+			return MakeReply(eSuccess);
+		});
+	}
+
+	Clients DoFindClients(const std::string &topic)
+	{
+		Clients dests;
+		auto Find1 = [&](const std::string &t) {
+			auto pos = subscribe_map_.find(topic);
+			if (pos != subscribe_map_.end()) {
+				auto &clients = pos->second;
+				for (auto &cli : clients) {
+					if (Valid(cli.weak_node_)) {
+						dests.insert(cli);
+					}
+				}
+			}
+		};
+		Find1(topic);
+
+		size_t pos = 0;
+		while (true) {
+			pos = topic.find(kTopicSep, pos);
+			if (pos == topic.npos || ++pos == topic.size()) {
+				// Find1(std::string()); // sub all.
+				break;
+			} else {
+				Find1(topic.substr(0, pos));
+			}
+		}
+		return dests;
+	}
+	bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply)
+	{
+		bool ret = false;
+		HandleMsg(head, [&](Node node) {
+			DoFindClients(msg.topic()).swap(out);
+			ret = true;
+			return MakeReply(eSuccess);
+		}).Swap(&reply);
+		return ret;
+	}
+
+	void OnTimer()
+	{
+		CheckNodes();
+	}
+
+private:
+	void CheckNodes()
+	{
+		auto it = nodes_.begin();
+		while (it != nodes_.end()) {
+			auto &cli = *it->second;
+			cli.state_.UpdateState(Now());
+			if (cli.state_.flag_ == kStateKillme) {
+				if (cleaner_) {
+					for (auto &addr : cli.addrs_) {
+						cleaner_(addr);
+					}
+				}
+				it = nodes_.erase(it);
+			} else {
+				++it;
+			}
+		}
+	}
+	bool Valid(const NodeInfo &node)
+	{
+		return node.state_.flag_ == kStateNormal;
+	}
+	bool Valid(const WeakNode &weak)
+	{
+		auto node = weak.lock();
+		return node && Valid(*node);
+	}
+	void CheckAllNodes(); //TODO, call it in timer.
+	std::string id_;      // center proc id;
+
+	std::unordered_map<Topic, Clients> service_map_;
+	std::unordered_map<Topic, Clients> subscribe_map_;
+	std::unordered_map<ProcId, Node> nodes_;
+	Cleaner cleaner_; // remove mqs.
+};
+
+template <class Body, class OnMsg, class Replyer>
+inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer)
+{
+	if (head.route_size() != 1) { return; }
+	Body body;
+	if (msg.ParseBody(body)) {
+		replyer(onmsg(body));
+	}
+}
+
+Handler Combine(const Handler &h1, const Handler &h2)
+{
+	return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head) {
+		return h1(socket, msg, head) || h2(socket, msg, head);
+	};
+}
+template <class... H>
+Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest)
+{
+	return Combine(Combine(h0, h1), h2, rest...);
+}
+
+#define CASE_ON_MSG_TYPE(MsgTag)                                                         \
+	case kMsgType##MsgTag:                                                               \
+		Dispatch<Msg##MsgTag>(                                                           \
+		    msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
+		return true;
+
+bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
+{
+
+	auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner);
+
+	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
+		return [&](auto &&rep_body) {
+			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
+			bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100);
+			if (!r) {
+				printf("send reply failed.\n");
+			}
+			//TODO resend failed.
+		};
+	};
+
+	auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
+		auto &center = *center_ptr;
+		center->OnTimer();
+	};
+
+	auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+		auto &center = *center_ptr;
+		auto replyer = MakeReplyer(socket, head, center->id());
+		switch (head.type()) {
+			CASE_ON_MSG_TYPE(Register);
+			CASE_ON_MSG_TYPE(Heartbeat);
+
+			CASE_ON_MSG_TYPE(RegisterRPC);
+			CASE_ON_MSG_TYPE(QueryTopic);
+		default: return false;
+		}
+	};
+
+	auto OnBusIdle = [](ShmSocket &socket) {};
+	auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+		auto &center = *center_ptr;
+		auto replyer = MakeReplyer(socket, head, center->id());
+		auto OnPublish = [&]() {
+			MsgPublish pub;
+			NodeCenter::Clients clients;
+			MsgCommonReply reply;
+			if (head.route_size() != 1 || !msg.ParseBody(pub)) {
+				return;
+			} else if (!center->FindClients(head, pub, clients, reply)) {
+				MakeReplyer(socket, head, center->id())(reply);
+			} else {
+				MakeReplyer(socket, head, center->id())(MakeReply(eSuccess));
+				if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
+
+				for (auto &cli : clients) {
+					auto node = cli.weak_node_.lock();
+					if (node) {
+						if (!socket.Send(cli.mq_.data(), msg, 100)) {
+							printf("center route publish failed. need resend.\n");
+						}
+					}
+				}
+			}
+		};
+		switch (head.type()) {
+			CASE_ON_MSG_TYPE(Subscribe);
+			CASE_ON_MSG_TYPE(Unsubscribe);
+		case kMsgTypePublish: OnPublish(); return true;
+		default: return false;
+		}
+	};
+
+	BHCenter::Install("#center.reg", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000);
+	BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000);
+
+	return true;
+}
+
+#undef CASE_ON_MSG_TYPE
+
+} // namespace
 
 SharedMemory &BHomeShm()
 {
-	static SharedMemory shm("bhome_default_shm_v0", 1024*1024*64);
+	static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64);
 	return shm;
 }
 
+BHCenter::CenterRecords &BHCenter::Centers()
+{
+	static CenterRecords rec;
+	return rec;
+}
+
+bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len)
+{
+	Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len};
+	return true;
+}
+bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len)
+{
+	return Install(name, handler, idle, std::string((const char *) &mqid, sizeof(mqid)), mq_len);
+}
+
+BHCenter::BHCenter(Socket::Shm &shm)
+{
+	auto gc = [&](const std::string &id) {
+		auto r = ShmSocket::Remove(shm, *(MQId *) id.data());
+		printf("remove mq : %s\n", r ? "ok" : "failed");
+	};
+
+	AddCenter("#center", gc);
+
+	for (auto &kv : Centers()) {
+		auto &info = kv.second;
+		sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_);
+	}
+}
+
+BHCenter::BHCenter() :
+    BHCenter(BHomeShm()) {}
+
+bool BHCenter::Start()
+{
+	for (auto &kv : Centers()) {
+		auto &info = kv.second;
+		sockets_[info.name_]->Start(info.handler_);
+	}
+
+	return true;
+}
+
+bool BHCenter::Stop()
+{
+	for (auto &kv : sockets_) {
+		kv.second->Stop();
+	}
+	return true;
+}
\ No newline at end of file

--
Gitblit v1.8.0