From 330f78f3334bcdcdb4cc2ab2dbf66604e0224d71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 16:21:45 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.5.5:10010/r/valib/bhshmq
---
box/node_center.cpp | 31 +++++++++++++++----------------
1 files changed, 15 insertions(+), 16 deletions(-)
diff --git a/box/node_center.cpp b/box/node_center.cpp
index dbf6ee8..cbaef0e 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -57,7 +57,7 @@
{
auto pos = msgs_.find(id);
if (pos != msgs_.end()) {
- ShmMsg(pos->second).Free();
+ pos->second.Free();
msgs_.erase(pos);
} else {
LOG_TRACE() << "ignore late free request.";
@@ -101,9 +101,9 @@
int i = 0;
int total_count = 0;
for (auto &kv : msgs_) {
- MsgI msg(kv.second);
+ auto &msg = kv.second;
total_count += msg.Count();
- LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size();
+ LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size();
}
LOG_TRACE() << "total count: " << total_count;
}
@@ -116,7 +116,7 @@
Json json;
json.put("proc_id", proc_.proc_id());
- center_.Publish(kTopicNodeOffline, json.dump());
+ center_.Publish(shm_, kTopicNodeOffline, json.dump());
}
void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
@@ -127,7 +127,7 @@
if (proc_.proc_id().empty()) { return; } // node init, ignore.
Json json;
json.put("proc_id", proc_.proc_id());
- center_.Publish(topic, json.dump());
+ center_.Publish(shm_, topic, json.dump());
};
LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
@@ -173,7 +173,7 @@
auto PrepareProcInit = [&](Node &node) {
bool r = false;
- ShmMsg init_msg;
+ ShmMsg init_msg(shm);
DEFER1(init_msg.Release());
MsgProcInit body;
auto head = InitMsgHead(GetType(body), id(), ssn);
@@ -182,7 +182,7 @@
SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg);
};
- Node node(new NodeInfo(*this));
+ Node node(new NodeInfo(*this, shm));
if (UpdateRegInfo(node) && PrepareProcInit(node)) {
reply |= (node->addrs_[ssn] << 4);
nodes_[ssn] = node;
@@ -238,7 +238,7 @@
if (!FindMq()) { return; }
auto size = GetAllocSize((val >> 52) & MaskBits(8));
- MsgI new_msg;
+ MsgI new_msg(socket.shm());
if (new_msg.Make(size)) {
// 31bit proc index, 28bit id, ,4bit cmd+flag
int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0);
@@ -281,7 +281,7 @@
auto &node = pos->second;
try {
for (int i = 0; i < msg.extra_mq_num(); ++i) {
- ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16);
+ ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16);
node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
auto addr = reply.add_extra_mqs();
addr->set_mq_id(tmp.id());
@@ -593,13 +593,15 @@
}
for (auto &addr : node->addrs_) {
- cleaner_(addr.first);
+ auto &id = addr.first;
+ auto r = ShmSocket::Remove(node->shm_, id);
+ LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed");
}
node->addrs_.clear();
}
-void NodeCenter::Publish(const Topic &topic, const std::string &content)
+void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content)
{
try {
// LOG_DEBUG() << "center publish: " << topic << ": " << content;
@@ -610,18 +612,15 @@
pub.set_topic(topic);
pub.set_data(content);
BHMsgHead head(InitMsgHead(GetType(pub), id(), 0));
- MsgI msg;
+ MsgI msg(shm);
if (msg.Make(head, pub)) {
DEFER1(msg.Release());
RecordMsg(msg);
- auto &mq = GetCenterInfo(BHomeShm())->mq_sender_;
- ShmSocket sender(mq.offset_, BHomeShm(), mq.id_);
-
for (auto &cli : clients) {
auto node = cli.weak_node_.lock();
if (node && node->state_.flag_ == kStateNormal) {
- sender.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
+ DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
}
}
}
--
Gitblit v1.8.0