From 3c7339498c5a47e912f6e6009c197291acd7e1fd Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 03 六月 2021 11:27:39 +0800
Subject: [PATCH] change mq shm name prefix.
---
box/node_center.cpp | 61 ++++++++++++++++++++++--------
1 files changed, 45 insertions(+), 16 deletions(-)
diff --git a/box/node_center.cpp b/box/node_center.cpp
index ff199b2..b285c9f 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -70,8 +70,9 @@
return;
}
// LOG_FUNCTION;
+ const size_t total = msgs_.size();
time_to_clean_ = now + 1;
- int64_t limit = std::max(10000ul, msgs_.size() / 10);
+ int64_t limit = std::max(10000ul, total / 10);
int64_t n = 0;
auto it = msgs_.begin();
while (it != msgs_.end() && --limit > 0) {
@@ -82,16 +83,16 @@
++n;
};
int n = now - msg.timestamp();
- if (n < 10) {
+ if (msg.Count() == 0) {
+ Free();
+ } else if (n > NodeTimeoutSec()) {
+ Free();
+ } else {
++it;
- } else if (msg.Count() == 0) {
- Free();
- } else if (n > 60) {
- Free();
}
}
if (n > 0) {
- LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
+ LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total;
}
}
@@ -163,7 +164,7 @@
// create sockets.
try {
- ShmSocket tmp(shm, true, ssn, 16);
+ ShmSocket tmp(shm, ssn, eCreate);
node->addrs_.emplace(ssn, tmp.AbsAddr());
return true;
} catch (...) {
@@ -209,17 +210,25 @@
RecordMsg(msg);
return socket.Send(dest, msg);
}
-bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+
+NodeCenter::Node NodeCenter::GetNode(const MQId mq_id)
{
- auto ssn = dest.id_ - (dest.id_ % 10);
- LOG_DEBUG() << "prox ssn " << ssn;
+ Node node;
+ auto ssn = mq_id - (mq_id % 10);
auto pos = nodes_.find(ssn);
- if (pos == nodes_.end()) {
- LOG_ERROR() << "proxy msg, ssn not found.";
+ if (pos != nodes_.end()) {
+ node = pos->second;
+ }
+ return node;
+}
+
+bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+{
+ Node node(GetNode(dest.id_));
+ if (!node || !Valid(*node)) {
+ LOG_ERROR() << id() << " pass remote request, dest not found.";
return false;
}
- auto &node = pos->second;
- if (!Valid(*node)) { return false; }
ShmSocket &sender(DefaultSender(node->shm_));
auto route = head.add_route();
@@ -231,6 +240,26 @@
DEFER1(msg.Release(););
RecordMsg(msg);
return sender.Send(dest, msg, head.msg_id(), std::move(cb));
+}
+
+bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
+{
+ Node node(GetNode(dest.id_));
+ if (!node) {
+ LOG_ERROR() << id() << " pass remote reply , ssn not found.";
+ return false;
+ }
+ auto offset = node->addrs_[dest.id_];
+ if (offset != dest.offset_) {
+ LOG_ERROR() << id() << " pass remote reply, dest address not match";
+ return false;
+ }
+
+ ShmMsg msg(node->shm_);
+ if (!msg.Make(head, body_content)) { return false; }
+ DEFER1(msg.Release(););
+ RecordMsg(msg);
+ return DefaultSender(node->shm_).Send(dest, msg);
}
void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
@@ -304,7 +333,7 @@
auto &node = pos->second;
try {
for (int i = 0; i < msg.extra_mq_num(); ++i) {
- ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16);
+ ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate);
node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
auto addr = reply.add_extra_mqs();
addr->set_mq_id(tmp.id());
--
Gitblit v1.8.0