From 993c556000a414011626770540678948f16eaa9e Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 02 六月 2021 17:40:50 +0800 Subject: [PATCH] center restart with new shm; set center node ssn. --- box/node_center.cpp | 70 ++++++++++++++++++++++++++++++---- 1 files changed, 61 insertions(+), 9 deletions(-) diff --git a/box/node_center.cpp b/box/node_center.cpp index cbaef0e..b285c9f 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -70,8 +70,9 @@ return; } // LOG_FUNCTION; + const size_t total = msgs_.size(); time_to_clean_ = now + 1; - int64_t limit = std::max(10000ul, msgs_.size() / 10); + int64_t limit = std::max(10000ul, total / 10); int64_t n = 0; auto it = msgs_.begin(); while (it != msgs_.end() && --limit > 0) { @@ -82,16 +83,16 @@ ++n; }; int n = now - msg.timestamp(); - if (n < 10) { + if (msg.Count() == 0) { + Free(); + } else if (n > NodeTimeoutSec()) { + Free(); + } else { ++it; - } else if (msg.Count() == 0) { - Free(); - } else if (n > 60) { - Free(); } } if (n > 0) { - LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; + LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total; } } @@ -163,7 +164,7 @@ // create sockets. try { - ShmSocket tmp(shm, true, ssn, 16); + ShmSocket tmp(shm, ssn, eCreate); node->addrs_.emplace(ssn, tmp.AbsAddr()); return true; } catch (...) { @@ -208,6 +209,57 @@ { RecordMsg(msg); return socket.Send(dest, msg); +} + +NodeCenter::Node NodeCenter::GetNode(const MQId mq_id) +{ + Node node; + auto ssn = mq_id - (mq_id % 10); + auto pos = nodes_.find(ssn); + if (pos != nodes_.end()) { + node = pos->second; + } + return node; +} + +bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) +{ + Node node(GetNode(dest.id_)); + if (!node || !Valid(*node)) { + LOG_ERROR() << id() << " pass remote request, dest not found."; + return false; + } + + ShmSocket &sender(DefaultSender(node->shm_)); + auto route = head.add_route(); + route->set_mq_id(sender.id()); + route->set_abs_addr(sender.AbsAddr()); + + ShmMsg msg(node->shm_); + if (!msg.Make(head, body_content)) { return false; } + DEFER1(msg.Release();); + RecordMsg(msg); + return sender.Send(dest, msg, head.msg_id(), std::move(cb)); +} + +bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) +{ + Node node(GetNode(dest.id_)); + if (!node) { + LOG_ERROR() << id() << " pass remote reply , ssn not found."; + return false; + } + auto offset = node->addrs_[dest.id_]; + if (offset != dest.offset_) { + LOG_ERROR() << id() << " pass remote reply, dest address not match"; + return false; + } + + ShmMsg msg(node->shm_); + if (!msg.Make(head, body_content)) { return false; } + DEFER1(msg.Release();); + RecordMsg(msg); + return DefaultSender(node->shm_).Send(dest, msg); } void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) @@ -281,7 +333,7 @@ auto &node = pos->second; try { for (int i = 0; i < msg.extra_mq_num(); ++i) { - ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16); + ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate); node->addrs_.emplace(tmp.id(), tmp.AbsAddr()); auto addr = reply.add_extra_mqs(); addr->set_mq_id(tmp.id()); -- Gitblit v1.8.0