From b2484c8bd77a9d21bcf1827f554444535196953d Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 10:47:18 +0800
Subject: [PATCH] center save shm on each node, no bind to shm.
---
utest/api_test.cpp | 5 +++--
box/center.cpp | 9 +--------
box/node_center.h | 12 ++++++------
box/node_center.cpp | 18 ++++++++++--------
4 files changed, 20 insertions(+), 24 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index 3f565b1..e77c38f 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -165,15 +165,8 @@
BHCenter::BHCenter(Socket::Shm &shm)
{
- auto gc = [&](const MQId id) {
- auto r = ShmSocket::Remove(shm, id);
- if (r) {
- LOG_DEBUG() << "remove mq " << id << " ok\n";
- }
- };
-
auto nsec = NodeTimeoutSec();
- auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
+ auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
AddCenter(center_ptr);
for (auto &kv : Centers()) {
diff --git a/box/node_center.cpp b/box/node_center.cpp
index dbf6ee8..4e228a7 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -116,7 +116,7 @@
Json json;
json.put("proc_id", proc_.proc_id());
- center_.Publish(kTopicNodeOffline, json.dump());
+ center_.Publish(shm_, kTopicNodeOffline, json.dump());
}
void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
@@ -127,7 +127,7 @@
if (proc_.proc_id().empty()) { return; } // node init, ignore.
Json json;
json.put("proc_id", proc_.proc_id());
- center_.Publish(topic, json.dump());
+ center_.Publish(shm_, topic, json.dump());
};
LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
@@ -182,7 +182,7 @@
SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg);
};
- Node node(new NodeInfo(*this));
+ Node node(new NodeInfo(*this, shm));
if (UpdateRegInfo(node) && PrepareProcInit(node)) {
reply |= (node->addrs_[ssn] << 4);
nodes_[ssn] = node;
@@ -281,7 +281,7 @@
auto &node = pos->second;
try {
for (int i = 0; i < msg.extra_mq_num(); ++i) {
- ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16);
+ ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16);
node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
auto addr = reply.add_extra_mqs();
addr->set_mq_id(tmp.id());
@@ -593,13 +593,15 @@
}
for (auto &addr : node->addrs_) {
- cleaner_(addr.first);
+ auto &id = addr.first;
+ auto r = ShmSocket::Remove(node->shm_, id);
+ LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed");
}
node->addrs_.clear();
}
-void NodeCenter::Publish(const Topic &topic, const std::string &content)
+void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content)
{
try {
// LOG_DEBUG() << "center publish: " << topic << ": " << content;
@@ -615,8 +617,8 @@
DEFER1(msg.Release());
RecordMsg(msg);
- auto &mq = GetCenterInfo(BHomeShm())->mq_sender_;
- ShmSocket sender(mq.offset_, BHomeShm(), mq.id_);
+ auto &mq = GetCenterInfo(shm)->mq_sender_;
+ ShmSocket sender(mq.offset_, shm, mq.id_);
for (auto &cli : clients) {
auto node = cli.weak_node_.lock();
diff --git a/box/node_center.h b/box/node_center.h
index 4d3fba3..ca16cc5 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -85,13 +85,14 @@
struct NodeInfo {
NodeCenter ¢er_;
+ SharedMemory &shm_;
ProcState state_; // state
std::map<MQId, int64_t> addrs_; // registered mqs
ProcInfo proc_; //
AddressTopics services_; // address: topics
AddressTopics subscriptions_; // address: topics
- NodeInfo(NodeCenter ¢er) :
- center_(center) {}
+ NodeInfo(NodeCenter ¢er, SharedMemory &shm) :
+ center_(center), shm_(shm) {}
void PutOffline(const int64_t offline_time);
void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
};
@@ -112,8 +113,8 @@
public:
typedef std::set<TopicDest> Clients;
- NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) :
- id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
+ NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) :
+ id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
// center name, no relative to shm.
const std::string &id() const { return id_; }
@@ -174,7 +175,7 @@
private:
void CheckNodes();
bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
- void Publish(const Topic &topic, const std::string &content);
+ void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
bool Valid(const WeakNode &weak)
{
@@ -191,7 +192,6 @@
ProcRecords procs_; // To get a short index for msg alloc.
MsgRecords msgs_; // record all msgs alloced.
- Cleaner cleaner_; // remove mqs.
int64_t offline_time_;
int64_t kill_time_;
int64_t last_check_time_;
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index dc3efb6..fb1587b 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -373,7 +373,7 @@
threads.Launch(hb, &run);
threads.Launch(showStatus, &run);
int ncli = 10;
- const int64_t nreq = 10; //00 * 100;
+ const int64_t nreq = 1000 * 100;
for (int i = 0; i < 10; ++i) {
SyncRequest(i);
@@ -397,10 +397,11 @@
}
}
+ Sleep(1s);
+
run = false;
threads.WaitAll();
auto &st = Status();
- Sleep(1s);
printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load());
BHCleanup();
printf("after cleanup\n");
--
Gitblit v1.8.0