From 72851db66655912cb9c92300a80985fb9797d168 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 01 六月 2021 16:25:23 +0800 Subject: [PATCH] remove AtomicQueue, not used. --- box/node_center.cpp | 99 +++++++++++++++++++++++++++++++++++++------------ 1 files changed, 75 insertions(+), 24 deletions(-) diff --git a/box/node_center.cpp b/box/node_center.cpp index dbf6ee8..b285c9f 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -57,7 +57,7 @@ { auto pos = msgs_.find(id); if (pos != msgs_.end()) { - ShmMsg(pos->second).Free(); + pos->second.Free(); msgs_.erase(pos); } else { LOG_TRACE() << "ignore late free request."; @@ -70,8 +70,9 @@ return; } // LOG_FUNCTION; + const size_t total = msgs_.size(); time_to_clean_ = now + 1; - int64_t limit = std::max(10000ul, msgs_.size() / 10); + int64_t limit = std::max(10000ul, total / 10); int64_t n = 0; auto it = msgs_.begin(); while (it != msgs_.end() && --limit > 0) { @@ -82,16 +83,16 @@ ++n; }; int n = now - msg.timestamp(); - if (n < 10) { + if (msg.Count() == 0) { + Free(); + } else if (n > NodeTimeoutSec()) { + Free(); + } else { ++it; - } else if (msg.Count() == 0) { - Free(); - } else if (n > 60) { - Free(); } } if (n > 0) { - LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; + LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total; } } @@ -101,9 +102,9 @@ int i = 0; int total_count = 0; for (auto &kv : msgs_) { - MsgI msg(kv.second); + auto &msg = kv.second; total_count += msg.Count(); - LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size(); + LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size(); } LOG_TRACE() << "total count: " << total_count; } @@ -116,7 +117,7 @@ Json json; json.put("proc_id", proc_.proc_id()); - center_.Publish(kTopicNodeOffline, json.dump()); + center_.Publish(shm_, kTopicNodeOffline, json.dump()); } void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) @@ -127,7 +128,7 @@ if (proc_.proc_id().empty()) { return; } // node init, ignore. Json json; json.put("proc_id", proc_.proc_id()); - center_.Publish(topic, json.dump()); + center_.Publish(shm_, topic, json.dump()); }; LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff; @@ -163,7 +164,7 @@ // create sockets. try { - ShmSocket tmp(shm, true, ssn, 16); + ShmSocket tmp(shm, ssn, eCreate); node->addrs_.emplace(ssn, tmp.AbsAddr()); return true; } catch (...) { @@ -173,7 +174,7 @@ auto PrepareProcInit = [&](Node &node) { bool r = false; - ShmMsg init_msg; + ShmMsg init_msg(shm); DEFER1(init_msg.Release()); MsgProcInit body; auto head = InitMsgHead(GetType(body), id(), ssn); @@ -182,7 +183,7 @@ SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg); }; - Node node(new NodeInfo(*this)); + Node node(new NodeInfo(*this, shm)); if (UpdateRegInfo(node) && PrepareProcInit(node)) { reply |= (node->addrs_[ssn] << 4); nodes_[ssn] = node; @@ -208,6 +209,57 @@ { RecordMsg(msg); return socket.Send(dest, msg); +} + +NodeCenter::Node NodeCenter::GetNode(const MQId mq_id) +{ + Node node; + auto ssn = mq_id - (mq_id % 10); + auto pos = nodes_.find(ssn); + if (pos != nodes_.end()) { + node = pos->second; + } + return node; +} + +bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) +{ + Node node(GetNode(dest.id_)); + if (!node || !Valid(*node)) { + LOG_ERROR() << id() << " pass remote request, dest not found."; + return false; + } + + ShmSocket &sender(DefaultSender(node->shm_)); + auto route = head.add_route(); + route->set_mq_id(sender.id()); + route->set_abs_addr(sender.AbsAddr()); + + ShmMsg msg(node->shm_); + if (!msg.Make(head, body_content)) { return false; } + DEFER1(msg.Release();); + RecordMsg(msg); + return sender.Send(dest, msg, head.msg_id(), std::move(cb)); +} + +bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) +{ + Node node(GetNode(dest.id_)); + if (!node) { + LOG_ERROR() << id() << " pass remote reply , ssn not found."; + return false; + } + auto offset = node->addrs_[dest.id_]; + if (offset != dest.offset_) { + LOG_ERROR() << id() << " pass remote reply, dest address not match"; + return false; + } + + ShmMsg msg(node->shm_); + if (!msg.Make(head, body_content)) { return false; } + DEFER1(msg.Release();); + RecordMsg(msg); + return DefaultSender(node->shm_).Send(dest, msg); } void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) @@ -238,7 +290,7 @@ if (!FindMq()) { return; } auto size = GetAllocSize((val >> 52) & MaskBits(8)); - MsgI new_msg; + MsgI new_msg(socket.shm()); if (new_msg.Make(size)) { // 31bit proc index, 28bit id, ,4bit cmd+flag int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0); @@ -281,7 +333,7 @@ auto &node = pos->second; try { for (int i = 0; i < msg.extra_mq_num(); ++i) { - ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16); + ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate); node->addrs_.emplace(tmp.id(), tmp.AbsAddr()); auto addr = reply.add_extra_mqs(); addr->set_mq_id(tmp.id()); @@ -593,13 +645,15 @@ } for (auto &addr : node->addrs_) { - cleaner_(addr.first); + auto &id = addr.first; + auto r = ShmSocket::Remove(node->shm_, id); + LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed"); } node->addrs_.clear(); } -void NodeCenter::Publish(const Topic &topic, const std::string &content) +void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content) { try { // LOG_DEBUG() << "center publish: " << topic << ": " << content; @@ -610,18 +664,15 @@ pub.set_topic(topic); pub.set_data(content); BHMsgHead head(InitMsgHead(GetType(pub), id(), 0)); - MsgI msg; + MsgI msg(shm); if (msg.Make(head, pub)) { DEFER1(msg.Release()); RecordMsg(msg); - auto &mq = GetCenterInfo(BHomeShm())->mq_sender_; - ShmSocket sender(mq.offset_, BHomeShm(), mq.id_); - for (auto &cli : clients) { auto node = cli.weak_node_.lock(); if (node && node->state_.flag_ == kStateNormal) { - sender.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); + DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); } } } -- Gitblit v1.8.0