From 232227035c8d6a31eaaf193863cbadda949c08fd Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期二, 20 七月 2021 20:19:26 +0800
Subject: [PATCH] fix memory leak
---
box/node_center.h | 61 ++++++++++++++++++++++++++----
1 files changed, 52 insertions(+), 9 deletions(-)
diff --git a/box/node_center.h b/box/node_center.h
index a085bdf..b6ceab5 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -18,6 +18,7 @@
#ifndef NODE_CENTER_KY67RJ1Q
#define NODE_CENTER_KY67RJ1Q
+#include "json.h"
#include "shm_socket.h"
#include <unordered_map>
@@ -82,6 +83,10 @@
};
typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
+ struct NodeInfo;
+ typedef std::shared_ptr<NodeInfo> Node;
+ typedef std::weak_ptr<NodeInfo> WeakNode;
+
struct NodeInfo {
NodeCenter ¢er_;
SharedMemory &shm_;
@@ -89,14 +94,15 @@
std::map<MQId, int64_t> addrs_; // registered mqs
ProcInfo proc_; //
AddressTopics services_; // address: topics
- AddressTopics subscriptions_; // address: topics
+ AddressTopics local_sub_; // address: topics
+ AddressTopics net_sub_; // address: topics
NodeInfo(NodeCenter ¢er, SharedMemory &shm) :
center_(center), shm_(shm) {}
void PutOffline(const int64_t offline_time);
void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
+ void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node);
+ void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node);
};
- typedef std::shared_ptr<NodeInfo> Node;
- typedef std::weak_ptr<NodeInfo> WeakNode;
struct TopicDest {
MQId mq_id_;
@@ -121,7 +127,9 @@
void RecordMsg(const MsgI &msg);
bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
- bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
+
+ bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
+ bool RemotePublish(BHMsgHead &head, const std::string &body_content);
bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
void OnAlloc(ShmSocket &socket, const int64_t val);
void OnFree(ShmSocket &socket, const int64_t val);
@@ -149,8 +157,8 @@
return op(node);
}
}
- } catch (...) {
- //TODO error log
+ } catch (std::exception &e) {
+ LOG_ERROR() << "handle msg exception: " << e.what();
return MakeReply<Reply>(eError, "internal error.");
}
}
@@ -176,15 +184,22 @@
MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
- Clients DoFindClients(const std::string &topic);
- bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply);
+ MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg);
void OnTimer();
+
+ // remote hosts records
+ std::set<std::string> FindRemoteSubClients(const Topic &topic) { return net_records_.FindSubHosts(topic); }
+ std::set<std::string> FindRemoteRPCServers(const Topic &topic) { return net_records_.FindRPCHosts(topic); }
+ void ParseNetInfo(ssjson::Json &info) { net_records_.ParseData(info); }
private:
void CheckNodes();
bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
+ void Notify(const Topic &topic, NodeInfo &node);
+ void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg);
+ Clients DoFindClients(const std::string &topic, bool from_remote);
bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
bool Valid(const WeakNode &weak)
{
@@ -197,7 +212,8 @@
std::string id_; // center proc id;
std::unordered_map<Topic, Clients> service_map_;
- std::unordered_map<Topic, Clients> subscribe_map_;
+ std::unordered_map<Topic, Clients> local_sub_map_;
+ std::unordered_map<Topic, Clients> net_sub_map_;
std::unordered_map<Address, Node> nodes_;
std::unordered_map<ProcId, Address> online_node_addr_map_;
ProcRecords procs_; // To get a short index for msg alloc.
@@ -206,6 +222,33 @@
int64_t offline_time_;
int64_t kill_time_;
int64_t last_check_time_;
+
+ // net hosts info
+ class NetRecords
+ {
+ public:
+ typedef std::set<std::string> Hosts;
+ void ParseData(const ssjson::Json &input);
+ Hosts FindRPCHosts(const Topic &topic) { return FindHosts(topic, rpc_hosts_); }
+ Hosts FindSubHosts(const Topic &topic) { return FindHosts(topic, sub_hosts_); }
+
+ private:
+ typedef std::unordered_map<Topic, Hosts> TopicMap;
+ TopicMap sub_hosts_;
+ TopicMap rpc_hosts_;
+ Hosts FindHosts(const Topic &topic, const TopicMap &tmap)
+ {
+ auto pos = tmap.find(topic);
+ if (pos != tmap.end()) {
+ return pos->second;
+ } else {
+ return Hosts();
+ }
+ }
+ std::string host_id_;
+ std::string ip_;
+ };
+ NetRecords net_records_;
};
#endif // end of include guard: NODE_CENTER_KY67RJ1Q
--
Gitblit v1.8.0