From 365c864a587365fe443b11cc0cd7cfc8f8f8eb81 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 01 六月 2021 11:19:22 +0800
Subject: [PATCH] refactor, clean up useless code.
---
box/node_center.cpp | 83 +++++++++++++++++++++++++++++++++--------
1 files changed, 66 insertions(+), 17 deletions(-)
diff --git a/box/node_center.cpp b/box/node_center.cpp
index 4e228a7..662b2c0 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -57,7 +57,7 @@
{
auto pos = msgs_.find(id);
if (pos != msgs_.end()) {
- ShmMsg(pos->second).Free();
+ pos->second.Free();
msgs_.erase(pos);
} else {
LOG_TRACE() << "ignore late free request.";
@@ -70,8 +70,9 @@
return;
}
// LOG_FUNCTION;
+ const size_t total = msgs_.size();
time_to_clean_ = now + 1;
- int64_t limit = std::max(10000ul, msgs_.size() / 10);
+ int64_t limit = std::max(10000ul, total / 10);
int64_t n = 0;
auto it = msgs_.begin();
while (it != msgs_.end() && --limit > 0) {
@@ -82,16 +83,16 @@
++n;
};
int n = now - msg.timestamp();
- if (n < 10) {
+ if (msg.Count() == 0) {
+ Free();
+ } else if (n > NodeTimeoutSec()) {
+ Free();
+ } else {
++it;
- } else if (msg.Count() == 0) {
- Free();
- } else if (n > 60) {
- Free();
}
}
if (n > 0) {
- LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
+ LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total;
}
}
@@ -101,9 +102,9 @@
int i = 0;
int total_count = 0;
for (auto &kv : msgs_) {
- MsgI msg(kv.second);
+ auto &msg = kv.second;
total_count += msg.Count();
- LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size();
+ LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size();
}
LOG_TRACE() << "total count: " << total_count;
}
@@ -173,7 +174,7 @@
auto PrepareProcInit = [&](Node &node) {
bool r = false;
- ShmMsg init_msg;
+ ShmMsg init_msg(shm);
DEFER1(init_msg.Release());
MsgProcInit body;
auto head = InitMsgHead(GetType(body), id(), ssn);
@@ -210,6 +211,57 @@
return socket.Send(dest, msg);
}
+NodeCenter::Node NodeCenter::GetNode(const MQId mq_id)
+{
+ Node node;
+ auto ssn = mq_id - (mq_id % 10);
+ auto pos = nodes_.find(ssn);
+ if (pos != nodes_.end()) {
+ node = pos->second;
+ }
+ return node;
+}
+
+bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+{
+ Node node(GetNode(dest.id_));
+ if (!node || !Valid(*node)) {
+ LOG_ERROR() << id() << " pass remote request, dest not found.";
+ return false;
+ }
+
+ ShmSocket &sender(DefaultSender(node->shm_));
+ auto route = head.add_route();
+ route->set_mq_id(sender.id());
+ route->set_abs_addr(sender.AbsAddr());
+
+ ShmMsg msg(node->shm_);
+ if (!msg.Make(head, body_content)) { return false; }
+ DEFER1(msg.Release(););
+ RecordMsg(msg);
+ return sender.Send(dest, msg, head.msg_id(), std::move(cb));
+}
+
+bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
+{
+ Node node(GetNode(dest.id_));
+ if (!node) {
+ LOG_ERROR() << id() << " pass remote reply , ssn not found.";
+ return false;
+ }
+ auto offset = node->addrs_[dest.id_];
+ if (offset != dest.offset_) {
+ LOG_ERROR() << id() << " pass remote reply, dest address not match";
+ return false;
+ }
+
+ ShmMsg msg(node->shm_);
+ if (!msg.Make(head, body_content)) { return false; }
+ DEFER1(msg.Release(););
+ RecordMsg(msg);
+ return DefaultSender(node->shm_).Send(dest, msg);
+}
+
void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
{
// LOG_FUNCTION;
@@ -238,7 +290,7 @@
if (!FindMq()) { return; }
auto size = GetAllocSize((val >> 52) & MaskBits(8));
- MsgI new_msg;
+ MsgI new_msg(socket.shm());
if (new_msg.Make(size)) {
// 31bit proc index, 28bit id, ,4bit cmd+flag
int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0);
@@ -612,18 +664,15 @@
pub.set_topic(topic);
pub.set_data(content);
BHMsgHead head(InitMsgHead(GetType(pub), id(), 0));
- MsgI msg;
+ MsgI msg(shm);
if (msg.Make(head, pub)) {
DEFER1(msg.Release());
RecordMsg(msg);
- auto &mq = GetCenterInfo(shm)->mq_sender_;
- ShmSocket sender(mq.offset_, shm, mq.id_);
-
for (auto &cli : clients) {
auto node = cli.weak_node_.lock();
if (node && node->state_.flag_ == kStateNormal) {
- sender.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
+ DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
}
}
}
--
Gitblit v1.8.0