From 232227035c8d6a31eaaf193863cbadda949c08fd Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期二, 20 七月 2021 20:19:26 +0800
Subject: [PATCH] fix memory leak

---
 box/node_center.h |   72 +++++++++++++++++++++++++++++++----
 1 files changed, 63 insertions(+), 9 deletions(-)

diff --git a/box/node_center.h b/box/node_center.h
index caaf054..b6ceab5 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -18,6 +18,7 @@
 #ifndef NODE_CENTER_KY67RJ1Q
 #define NODE_CENTER_KY67RJ1Q
 
+#include "json.h"
 #include "shm_socket.h"
 #include <unordered_map>
 
@@ -48,7 +49,6 @@
 class MsgRecords
 {
 	typedef int64_t MsgId;
-	typedef int64_t Offset;
 
 public:
 	void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); }
@@ -83,6 +83,10 @@
 	};
 	typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
 
+	struct NodeInfo;
+	typedef std::shared_ptr<NodeInfo> Node;
+	typedef std::weak_ptr<NodeInfo> WeakNode;
+
 	struct NodeInfo {
 		NodeCenter &center_;
 		SharedMemory &shm_;
@@ -90,14 +94,15 @@
 		std::map<MQId, int64_t> addrs_; // registered mqs
 		ProcInfo proc_;                 //
 		AddressTopics services_;        // address: topics
-		AddressTopics subscriptions_;   // address: topics
+		AddressTopics local_sub_;       // address: topics
+		AddressTopics net_sub_;         // address: topics
 		NodeInfo(NodeCenter &center, SharedMemory &shm) :
 		    center_(center), shm_(shm) {}
 		void PutOffline(const int64_t offline_time);
 		void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
+		void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node);
+		void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node);
 	};
-	typedef std::shared_ptr<NodeInfo> Node;
-	typedef std::weak_ptr<NodeInfo> WeakNode;
 
 	struct TopicDest {
 		MQId mq_id_;
@@ -122,6 +127,10 @@
 	void RecordMsg(const MsgI &msg);
 	bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
 	bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
+
+	bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
+	bool RemotePublish(BHMsgHead &head, const std::string &body_content);
+	bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
 	void OnAlloc(ShmSocket &socket, const int64_t val);
 	void OnFree(ShmSocket &socket, const int64_t val);
 	bool OnCommand(ShmSocket &socket, const int64_t val);
@@ -148,8 +157,8 @@
 					return op(node);
 				}
 			}
-		} catch (...) {
-			//TODO error log
+		} catch (std::exception &e) {
+			LOG_ERROR() << "handle msg exception: " << e.what();
 			return MakeReply<Reply>(eError, "internal error.");
 		}
 	}
@@ -157,6 +166,14 @@
 	inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op)
 	{
 		return HandleMsg<MsgCommonReply, Func>(head, op);
+	}
+	template <class Reply>
+	bool CheckMsg(const BHMsgHead &head, Reply &reply)
+	{
+		bool r = false;
+		auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); };
+		reply = HandleMsg<Reply>(head, onOk);
+		return r;
 	}
 
 	MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg);
@@ -167,15 +184,22 @@
 	MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
 	MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
 	MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
-	Clients DoFindClients(const std::string &topic);
-	bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply);
+	MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg);
 
 	void OnTimer();
+
+	// remote hosts records
+	std::set<std::string> FindRemoteSubClients(const Topic &topic) { return net_records_.FindSubHosts(topic); }
+	std::set<std::string> FindRemoteRPCServers(const Topic &topic) { return net_records_.FindRPCHosts(topic); }
+	void ParseNetInfo(ssjson::Json &info) { net_records_.ParseData(info); }
 
 private:
 	void CheckNodes();
 	bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
 	void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
+	void Notify(const Topic &topic, NodeInfo &node);
+	void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg);
+	Clients DoFindClients(const std::string &topic, bool from_remote);
 	bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
 	bool Valid(const WeakNode &weak)
 	{
@@ -183,10 +207,13 @@
 		return node && Valid(*node);
 	}
 	void RemoveNode(Node &node);
+	Node GetNode(const MQId mq);
+
 	std::string id_; // center proc id;
 
 	std::unordered_map<Topic, Clients> service_map_;
-	std::unordered_map<Topic, Clients> subscribe_map_;
+	std::unordered_map<Topic, Clients> local_sub_map_;
+	std::unordered_map<Topic, Clients> net_sub_map_;
 	std::unordered_map<Address, Node> nodes_;
 	std::unordered_map<ProcId, Address> online_node_addr_map_;
 	ProcRecords procs_; // To get a short index for msg alloc.
@@ -195,6 +222,33 @@
 	int64_t offline_time_;
 	int64_t kill_time_;
 	int64_t last_check_time_;
+
+	// net hosts info
+	class NetRecords
+	{
+	public:
+		typedef std::set<std::string> Hosts;
+		void ParseData(const ssjson::Json &input);
+		Hosts FindRPCHosts(const Topic &topic) { return FindHosts(topic, rpc_hosts_); }
+		Hosts FindSubHosts(const Topic &topic) { return FindHosts(topic, sub_hosts_); }
+
+	private:
+		typedef std::unordered_map<Topic, Hosts> TopicMap;
+		TopicMap sub_hosts_;
+		TopicMap rpc_hosts_;
+		Hosts FindHosts(const Topic &topic, const TopicMap &tmap)
+		{
+			auto pos = tmap.find(topic);
+			if (pos != tmap.end()) {
+				return pos->second;
+			} else {
+				return Hosts();
+			}
+		}
+		std::string host_id_;
+		std::string ip_;
+	};
+	NetRecords net_records_;
 };
 
 #endif // end of include guard: NODE_CENTER_KY67RJ1Q

--
Gitblit v1.8.0