From 72851db66655912cb9c92300a80985fb9797d168 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 01 六月 2021 16:25:23 +0800
Subject: [PATCH] remove AtomicQueue, not used.

---
 box/node_center.cpp |  276 +++++++++++++++++++++++++++++++++++++++---------------
 1 files changed, 198 insertions(+), 78 deletions(-)

diff --git a/box/node_center.cpp b/box/node_center.cpp
index b970d44..b285c9f 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -16,7 +16,22 @@
  * =====================================================================================
  */
 #include "node_center.h"
+#include "json.h"
 #include "log.h"
+
+using ssjson::Json;
+
+namespace
+{
+std::string Join(const std::string &parent, const std::string &child)
+{
+	return parent + kTopicSep + child;
+}
+const std::string kTopicCenterRoot = "#center";
+const std::string kTopicNode = Join(kTopicCenterRoot, "node");
+const std::string kTopicNodeOnline = Join(kTopicNode, "online");
+const std::string kTopicNodeOffline = Join(kTopicNode, "offline");
+} // namespace
 
 ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn)
 {
@@ -42,7 +57,7 @@
 {
 	auto pos = msgs_.find(id);
 	if (pos != msgs_.end()) {
-		ShmMsg(pos->second).Free();
+		pos->second.Free();
 		msgs_.erase(pos);
 	} else {
 		LOG_TRACE() << "ignore late free request.";
@@ -55,8 +70,9 @@
 		return;
 	}
 	// LOG_FUNCTION;
+	const size_t total = msgs_.size();
 	time_to_clean_ = now + 1;
-	int64_t limit = std::max(10000ul, msgs_.size() / 10);
+	int64_t limit = std::max(10000ul, total / 10);
 	int64_t n = 0;
 	auto it = msgs_.begin();
 	while (it != msgs_.end() && --limit > 0) {
@@ -67,49 +83,67 @@
 			++n;
 		};
 		int n = now - msg.timestamp();
-		if (n < 10) {
+		if (msg.Count() == 0) {
+			Free();
+		} else if (n > NodeTimeoutSec()) {
+			Free();
+		} else {
 			++it;
-		} else if (msg.Count() == 0) {
-			Free();
-		} else if (n > 60) {
-			Free();
 		}
 	}
 	if (n > 0) {
-		LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
+		LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total;
 	}
 }
 
 void MsgRecords::DebugPrint() const
 {
-	LOG_DEBUG() << "msgs : " << size();
+	LOG_TRACE() << "msgs : " << size();
 	int i = 0;
 	int total_count = 0;
 	for (auto &kv : msgs_) {
-		MsgI msg(kv.second);
+		auto &msg = kv.second;
 		total_count += msg.Count();
-		LOG_TRACE() << "  " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size();
+		LOG_TRACE() << "  " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size();
 	}
-	LOG_DEBUG() << "total count: " << total_count;
+	LOG_TRACE() << "total count: " << total_count;
 }
 
 // NodeCenter::ProcState
-void NodeCenter::ProcState::PutOffline(const int64_t offline_time)
+void NodeCenter::NodeInfo::PutOffline(const int64_t offline_time)
 {
-	timestamp_ = NowSec() - offline_time;
-	flag_ = kStateOffline;
+	state_.timestamp_ = NowSec() - offline_time;
+	state_.flag_ = kStateOffline;
+
+	Json json;
+	json.put("proc_id", proc_.proc_id());
+	center_.Publish(shm_, kTopicNodeOffline, json.dump());
 }
 
-void NodeCenter::ProcState::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
+void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
 {
-	auto diff = now - timestamp_;
-	LOG_DEBUG() << "state " << this << " diff: " << diff;
+	auto old = state_.flag_;
+	auto diff = now - state_.timestamp_;
+	auto publish = [this](const std::string &topic) {
+		if (proc_.proc_id().empty()) { return; } // node init, ignore.
+		Json json;
+		json.put("proc_id", proc_.proc_id());
+		center_.Publish(shm_, topic, json.dump());
+	};
+
+	LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
 	if (diff < offline_time) {
-		flag_ = kStateNormal;
+		state_.flag_ = kStateNormal;
+		if (old != state_.flag_) {
+			publish(kTopicNodeOnline);
+		}
 	} else if (diff < kill_time) {
-		flag_ = kStateOffline;
+		state_.flag_ = kStateOffline;
+		if (old != state_.flag_) {
+			publish(kTopicNodeOffline);
+		}
 	} else {
-		flag_ = kStateKillme;
+		state_.flag_ = kStateKillme;
 	}
 }
 
@@ -126,11 +160,11 @@
 
 	auto UpdateRegInfo = [&](Node &node) {
 		node->state_.timestamp_ = NowSec() - offline_time_;
-		node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
+		node->UpdateState(NowSec(), offline_time_, kill_time_);
 
 		// create sockets.
 		try {
-			ShmSocket tmp(shm, true, ssn, 16);
+			ShmSocket tmp(shm, ssn, eCreate);
 			node->addrs_.emplace(ssn, tmp.AbsAddr());
 			return true;
 		} catch (...) {
@@ -140,7 +174,7 @@
 
 	auto PrepareProcInit = [&](Node &node) {
 		bool r = false;
-		ShmMsg init_msg;
+		ShmMsg init_msg(shm);
 		DEFER1(init_msg.Release());
 		MsgProcInit body;
 		auto head = InitMsgHead(GetType(body), id(), ssn);
@@ -149,7 +183,7 @@
 		       SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg);
 	};
 
-	Node node(new NodeInfo);
+	Node node(new NodeInfo(*this, shm));
 	if (UpdateRegInfo(node) && PrepareProcInit(node)) {
 		reply |= (node->addrs_[ssn] << 4);
 		nodes_[ssn] = node;
@@ -175,6 +209,57 @@
 {
 	RecordMsg(msg);
 	return socket.Send(dest, msg);
+}
+
+NodeCenter::Node NodeCenter::GetNode(const MQId mq_id)
+{
+	Node node;
+	auto ssn = mq_id - (mq_id % 10);
+	auto pos = nodes_.find(ssn);
+	if (pos != nodes_.end()) {
+		node = pos->second;
+	}
+	return node;
+}
+
+bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+{
+	Node node(GetNode(dest.id_));
+	if (!node || !Valid(*node)) {
+		LOG_ERROR() << id() << " pass remote request, dest not found.";
+		return false;
+	}
+
+	ShmSocket &sender(DefaultSender(node->shm_));
+	auto route = head.add_route();
+	route->set_mq_id(sender.id());
+	route->set_abs_addr(sender.AbsAddr());
+
+	ShmMsg msg(node->shm_);
+	if (!msg.Make(head, body_content)) { return false; }
+	DEFER1(msg.Release(););
+	RecordMsg(msg);
+	return sender.Send(dest, msg, head.msg_id(), std::move(cb));
+}
+
+bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
+{
+	Node node(GetNode(dest.id_));
+	if (!node) {
+		LOG_ERROR() << id() << " pass remote reply , ssn not found.";
+		return false;
+	}
+	auto offset = node->addrs_[dest.id_];
+	if (offset != dest.offset_) {
+		LOG_ERROR() << id() << " pass remote reply, dest address not match";
+		return false;
+	}
+
+	ShmMsg msg(node->shm_);
+	if (!msg.Make(head, body_content)) { return false; }
+	DEFER1(msg.Release(););
+	RecordMsg(msg);
+	return DefaultSender(node->shm_).Send(dest, msg);
 }
 
 void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
@@ -205,7 +290,7 @@
 	if (!FindMq()) { return; }
 
 	auto size = GetAllocSize((val >> 52) & MaskBits(8));
-	MsgI new_msg;
+	MsgI new_msg(socket.shm());
 	if (new_msg.Make(size)) {
 		// 31bit proc index, 28bit id, ,4bit cmd+flag
 		int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0);
@@ -248,7 +333,7 @@
 	auto &node = pos->second;
 	try {
 		for (int i = 0; i < msg.extra_mq_num(); ++i) {
-			ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16);
+			ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate);
 			node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
 			auto addr = reply.add_extra_mqs();
 			addr->set_mq_id(tmp.id());
@@ -271,33 +356,33 @@
 		MQId ssn = head.ssn_id();
 		// when node restart, ssn will change,
 		// and old node will be removed after timeout.
-		auto UpdateRegInfo = [&](Node &node) {
-			node->proc_.Swap(msg.mutable_proc());
-			node->state_.timestamp_ = head.timestamp();
-			node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
-		};
-
 		auto pos = nodes_.find(ssn);
 		if (pos == nodes_.end()) {
 			return MakeReply(eInvalidInput, "invalid session.");
 		}
 
-		// update proc info
-		Node &node = pos->second;
-		UpdateRegInfo(node);
-		LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")";
-
+		// try to remove old session
 		auto old = online_node_addr_map_.find(head.proc_id());
 		if (old != online_node_addr_map_.end()) { // old session
 			auto &old_ssn = old->second;
 			if (old_ssn != ssn) {
-				nodes_[old_ssn]->state_.PutOffline(offline_time_);
+				nodes_[old_ssn]->PutOffline(offline_time_);
+
 				LOG_DEBUG() << "put node (" << nodes_[old_ssn]->proc_.proc_id() << ") ssn (" << old->second << ") offline";
 				old_ssn = ssn;
 			}
 		} else {
 			online_node_addr_map_.emplace(head.proc_id(), ssn);
 		}
+
+		// update proc info
+		Node &node = pos->second;
+		node->proc_.Swap(msg.mutable_proc());
+		node->state_.timestamp_ = head.timestamp();
+		node->UpdateState(NowSec(), offline_time_, kill_time_);
+
+		LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")";
+
 		return MakeReply(eSuccess);
 	} catch (...) {
 		return MakeReply(eError, "register node error.");
@@ -309,7 +394,7 @@
 	return HandleMsg(
 	    head, [&](Node node) -> MsgCommonReply {
 		    NodeInfo &ni = *node;
-		    ni.state_.PutOffline(offline_time_);
+		    ni.PutOffline(offline_time_);
 		    return MakeReply(eSuccess);
 	    });
 }
@@ -338,7 +423,7 @@
 	return HandleMsg(head, [&](Node node) {
 		NodeInfo &ni = *node;
 		ni.state_.timestamp_ = head.timestamp();
-		ni.state_.UpdateState(NowSec(), offline_time_, kill_time_);
+		ni.UpdateState(NowSec(), offline_time_, kill_time_);
 
 		auto &info = msg.proc();
 		if (!info.public_info().empty()) {
@@ -350,44 +435,48 @@
 		return MakeReply(eSuccess);
 	});
 }
-MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
+
+MsgQueryProcReply NodeCenter::QueryProc(const std::string &proc_id)
 {
 	typedef MsgQueryProcReply Reply;
-	auto query = [&](Node self) -> Reply {
-		auto Add1 = [](Reply &reply, Node node) {
-			auto info = reply.add_proc_list();
-			*info->mutable_proc() = node->proc_;
-			info->set_online(node->state_.flag_ == kStateNormal);
-			for (auto &addr_topics : node->services_) {
-				for (auto &topic : addr_topics.second) {
-					info->mutable_topics()->add_topic_list(topic);
-				}
+	auto Add1 = [](Reply &reply, Node node) {
+		auto info = reply.add_proc_list();
+		*info->mutable_proc() = node->proc_;
+		info->mutable_proc()->clear_private_info();
+		info->set_online(node->state_.flag_ == kStateNormal);
+		for (auto &addr_topics : node->services_) {
+			for (auto &topic : addr_topics.second) {
+				info->mutable_topics()->add_topic_list(topic);
 			}
-		};
-
-		if (!req.proc_id().empty()) {
-			auto pos = online_node_addr_map_.find(req.proc_id());
-			if (pos == online_node_addr_map_.end()) {
-				return MakeReply<Reply>(eNotFound, "proc not found.");
-			} else {
-				auto node_pos = nodes_.find(pos->second);
-				if (node_pos == nodes_.end()) {
-					return MakeReply<Reply>(eNotFound, "proc node not found.");
-				} else {
-					auto reply = MakeReply<Reply>(eSuccess);
-					Add1(reply, node_pos->second);
-					return reply;
-				}
-			}
-		} else {
-			Reply reply(MakeReply<Reply>(eSuccess));
-			for (auto &kv : nodes_) {
-				Add1(reply, kv.second);
-			}
-			return reply;
 		}
 	};
 
+	if (!proc_id.empty()) {
+		auto pos = online_node_addr_map_.find(proc_id);
+		if (pos == online_node_addr_map_.end()) {
+			return MakeReply<Reply>(eNotFound, "proc not found.");
+		} else {
+			auto node_pos = nodes_.find(pos->second);
+			if (node_pos == nodes_.end()) {
+				return MakeReply<Reply>(eNotFound, "proc node not found.");
+			} else {
+				auto reply = MakeReply<Reply>(eSuccess);
+				Add1(reply, node_pos->second);
+				return reply;
+			}
+		}
+	} else {
+		Reply reply(MakeReply<Reply>(eSuccess));
+		for (auto &kv : nodes_) {
+			Add1(reply, kv.second);
+		}
+		return reply;
+	}
+}
+MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
+{
+	typedef MsgQueryProcReply Reply;
+	auto query = [&](Node self) -> Reply { return this->QueryProc(req.proc_id()); };
 	return HandleMsg<Reply>(head, query);
 }
 
@@ -465,8 +554,8 @@
 NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic)
 {
 	Clients dests;
-	auto Find1 = [&](const std::string &t) {
-		auto pos = subscribe_map_.find(topic);
+	auto Find1 = [&](const std::string &exact) {
+		auto pos = subscribe_map_.find(exact);
 		if (pos != subscribe_map_.end()) {
 			auto &clients = pos->second;
 			for (auto &cli : clients) {
@@ -485,7 +574,7 @@
 			// Find1(std::string()); // sub all.
 			break;
 		} else {
-			Find1(topic.substr(0, pos));
+			Find1(topic.substr(0, pos - 1));
 		}
 	}
 	return dests;
@@ -517,7 +606,7 @@
 	auto it = nodes_.begin();
 	while (it != nodes_.end()) {
 		auto &cli = *it->second;
-		cli.state_.UpdateState(now, offline_time_, kill_time_);
+		cli.UpdateState(now, offline_time_, kill_time_);
 		if (cli.state_.flag_ == kStateKillme) {
 			RemoveNode(it->second);
 			it = nodes_.erase(it);
@@ -556,8 +645,39 @@
 	}
 
 	for (auto &addr : node->addrs_) {
-		cleaner_(addr.first);
+		auto &id = addr.first;
+		auto r = ShmSocket::Remove(node->shm_, id);
+		LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed");
 	}
 
 	node->addrs_.clear();
+}
+
+void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content)
+{
+	try {
+		// LOG_DEBUG() << "center publish: " << topic << ": " << content;
+		Clients clients(DoFindClients(topic));
+		if (clients.empty()) { return; }
+
+		MsgPublish pub;
+		pub.set_topic(topic);
+		pub.set_data(content);
+		BHMsgHead head(InitMsgHead(GetType(pub), id(), 0));
+		MsgI msg(shm);
+		if (msg.Make(head, pub)) {
+			DEFER1(msg.Release());
+			RecordMsg(msg);
+
+			for (auto &cli : clients) {
+				auto node = cli.weak_node_.lock();
+				if (node && node->state_.flag_ == kStateNormal) {
+					DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
+				}
+			}
+		}
+
+	} catch (...) {
+		LOG_ERROR() << "center publish error.";
+	}
 }
\ No newline at end of file

--
Gitblit v1.8.0