From 9243710ca372de26823c2225c7b46b072458c671 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 28 五月 2021 17:18:33 +0800 Subject: [PATCH] tcp proxy requests, need more test. --- box/node_center.cpp | 57 +++++++++++++++++++++++++++++++++++++++++++-------------- 1 files changed, 43 insertions(+), 14 deletions(-) diff --git a/box/node_center.cpp b/box/node_center.cpp index ff199b2..662b2c0 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -70,8 +70,9 @@ return; } // LOG_FUNCTION; + const size_t total = msgs_.size(); time_to_clean_ = now + 1; - int64_t limit = std::max(10000ul, msgs_.size() / 10); + int64_t limit = std::max(10000ul, total / 10); int64_t n = 0; auto it = msgs_.begin(); while (it != msgs_.end() && --limit > 0) { @@ -82,16 +83,16 @@ ++n; }; int n = now - msg.timestamp(); - if (n < 10) { + if (msg.Count() == 0) { + Free(); + } else if (n > NodeTimeoutSec()) { + Free(); + } else { ++it; - } else if (msg.Count() == 0) { - Free(); - } else if (n > 60) { - Free(); } } if (n > 0) { - LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; + LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total; } } @@ -209,17 +210,25 @@ RecordMsg(msg); return socket.Send(dest, msg); } -bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) + +NodeCenter::Node NodeCenter::GetNode(const MQId mq_id) { - auto ssn = dest.id_ - (dest.id_ % 10); - LOG_DEBUG() << "prox ssn " << ssn; + Node node; + auto ssn = mq_id - (mq_id % 10); auto pos = nodes_.find(ssn); - if (pos == nodes_.end()) { - LOG_ERROR() << "proxy msg, ssn not found."; + if (pos != nodes_.end()) { + node = pos->second; + } + return node; +} + +bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) +{ + Node node(GetNode(dest.id_)); + if (!node || !Valid(*node)) { + LOG_ERROR() << id() << " pass remote request, dest not found."; return false; } - auto &node = pos->second; - if (!Valid(*node)) { return false; } ShmSocket &sender(DefaultSender(node->shm_)); auto route = head.add_route(); @@ -233,6 +242,26 @@ return sender.Send(dest, msg, head.msg_id(), std::move(cb)); } +bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) +{ + Node node(GetNode(dest.id_)); + if (!node) { + LOG_ERROR() << id() << " pass remote reply , ssn not found."; + return false; + } + auto offset = node->addrs_[dest.id_]; + if (offset != dest.offset_) { + LOG_ERROR() << id() << " pass remote reply, dest address not match"; + return false; + } + + ShmMsg msg(node->shm_); + if (!msg.Make(head, body_content)) { return false; } + DEFER1(msg.Release();); + RecordMsg(msg); + return DefaultSender(node->shm_).Send(dest, msg); +} + void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) { // LOG_FUNCTION; -- Gitblit v1.8.0