From c64c54d8e75b9354dc49a7b6b2d326e7dd59eb37 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 15 四月 2021 19:32:16 +0800 Subject: [PATCH] add api; fix send, socknode mem leak. --- box/center.cpp | 53 ++++++++++++++++++++++++++++++++++------------------- 1 files changed, 34 insertions(+), 19 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 0dd4ed4..8625f7f 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -121,20 +121,18 @@ }; auto pos = nodes_.find(head.proc_id()); - if (pos == nodes_.end()) { // new client - Node node(new NodeInfo); - UpdateRegInfo(node); - nodes_[node->proc_.proc_id()] = node; - } else { + if (pos != nodes_.end()) { // new client Node &node = pos->second; if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) { // node restarted, release old mq. - for (auto &addr : node->addrs_) { - cleaner_(addr); - } - node->addrs_.clear(); + RemoveNode(node); + node.reset(new NodeInfo); } UpdateRegInfo(node); + } else { + Node node(new NodeInfo); + UpdateRegInfo(node); + nodes_[node->proc_.proc_id()] = node; } return MakeReply(eSuccess); } catch (...) { @@ -334,11 +332,7 @@ auto &cli = *it->second; cli.state_.UpdateState(now, offline_time_, kill_time_); if (cli.state_.flag_ == kStateKillme) { - if (cleaner_) { - for (auto &addr : cli.addrs_) { - cleaner_(addr); - } - } + RemoveNode(it->second); it = nodes_.erase(it); } else { ++it; @@ -357,6 +351,30 @@ { auto node = weak.lock(); return node && Valid(*node); + } + void RemoveNode(Node &node) + { + auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) { + for (auto &addr_topics : node_rec) { + TopicDest dest{addr_topics.first, node}; + for (auto &topic : addr_topics.second) { + auto pos = rec_map.find(topic); + if (pos != rec_map.end()) { + pos->second.erase(dest); + if (pos->second.empty()) { + rec_map.erase(pos); + } + } + } + } + }; + EraseMapRec(service_map_, node->services_); + EraseMapRec(subscribe_map_, node->subscriptions_); + + for (auto &addr : node->addrs_) { + cleaner_(addr); + } + node->addrs_.clear(); } std::string id_; // center proc id; @@ -403,11 +421,8 @@ auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); - MsgI msg; - if (msg.Make(socket.shm(), reply_head, rep_body)) { - auto &remote = head.route(0).mq_id(); - bool r = socket.Send(remote.data(), msg); - } + auto &remote = head.route(0).mq_id(); + socket.Send(remote.data(), reply_head, rep_body); }; }; -- Gitblit v1.8.0