From b2484c8bd77a9d21bcf1827f554444535196953d Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 21 五月 2021 10:47:18 +0800 Subject: [PATCH] center save shm on each node, no bind to shm. --- utest/api_test.cpp | 5 +++-- box/center.cpp | 9 +-------- box/node_center.h | 12 ++++++------ box/node_center.cpp | 18 ++++++++++-------- 4 files changed, 20 insertions(+), 24 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 3f565b1..e77c38f 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -165,15 +165,8 @@ BHCenter::BHCenter(Socket::Shm &shm) { - auto gc = [&](const MQId id) { - auto r = ShmSocket::Remove(shm, id); - if (r) { - LOG_DEBUG() << "remove mq " << id << " ok\n"; - } - }; - auto nsec = NodeTimeoutSec(); - auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. + auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. AddCenter(center_ptr); for (auto &kv : Centers()) { diff --git a/box/node_center.cpp b/box/node_center.cpp index dbf6ee8..4e228a7 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -116,7 +116,7 @@ Json json; json.put("proc_id", proc_.proc_id()); - center_.Publish(kTopicNodeOffline, json.dump()); + center_.Publish(shm_, kTopicNodeOffline, json.dump()); } void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) @@ -127,7 +127,7 @@ if (proc_.proc_id().empty()) { return; } // node init, ignore. Json json; json.put("proc_id", proc_.proc_id()); - center_.Publish(topic, json.dump()); + center_.Publish(shm_, topic, json.dump()); }; LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff; @@ -182,7 +182,7 @@ SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg); }; - Node node(new NodeInfo(*this)); + Node node(new NodeInfo(*this, shm)); if (UpdateRegInfo(node) && PrepareProcInit(node)) { reply |= (node->addrs_[ssn] << 4); nodes_[ssn] = node; @@ -281,7 +281,7 @@ auto &node = pos->second; try { for (int i = 0; i < msg.extra_mq_num(); ++i) { - ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16); + ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16); node->addrs_.emplace(tmp.id(), tmp.AbsAddr()); auto addr = reply.add_extra_mqs(); addr->set_mq_id(tmp.id()); @@ -593,13 +593,15 @@ } for (auto &addr : node->addrs_) { - cleaner_(addr.first); + auto &id = addr.first; + auto r = ShmSocket::Remove(node->shm_, id); + LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed"); } node->addrs_.clear(); } -void NodeCenter::Publish(const Topic &topic, const std::string &content) +void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content) { try { // LOG_DEBUG() << "center publish: " << topic << ": " << content; @@ -615,8 +617,8 @@ DEFER1(msg.Release()); RecordMsg(msg); - auto &mq = GetCenterInfo(BHomeShm())->mq_sender_; - ShmSocket sender(mq.offset_, BHomeShm(), mq.id_); + auto &mq = GetCenterInfo(shm)->mq_sender_; + ShmSocket sender(mq.offset_, shm, mq.id_); for (auto &cli : clients) { auto node = cli.weak_node_.lock(); diff --git a/box/node_center.h b/box/node_center.h index 4d3fba3..ca16cc5 100644 --- a/box/node_center.h +++ b/box/node_center.h @@ -85,13 +85,14 @@ struct NodeInfo { NodeCenter ¢er_; + SharedMemory &shm_; ProcState state_; // state std::map<MQId, int64_t> addrs_; // registered mqs ProcInfo proc_; // AddressTopics services_; // address: topics AddressTopics subscriptions_; // address: topics - NodeInfo(NodeCenter ¢er) : - center_(center) {} + NodeInfo(NodeCenter ¢er, SharedMemory &shm) : + center_(center), shm_(shm) {} void PutOffline(const int64_t offline_time); void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); }; @@ -112,8 +113,8 @@ public: typedef std::set<TopicDest> Clients; - NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) : - id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {} + NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) : + id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {} // center name, no relative to shm. const std::string &id() const { return id_; } @@ -174,7 +175,7 @@ private: void CheckNodes(); bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } - void Publish(const Topic &topic, const std::string &content); + void Publish(SharedMemory &shm, const Topic &topic, const std::string &content); bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } bool Valid(const WeakNode &weak) { @@ -191,7 +192,6 @@ ProcRecords procs_; // To get a short index for msg alloc. MsgRecords msgs_; // record all msgs alloced. - Cleaner cleaner_; // remove mqs. int64_t offline_time_; int64_t kill_time_; int64_t last_check_time_; diff --git a/utest/api_test.cpp b/utest/api_test.cpp index dc3efb6..fb1587b 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -373,7 +373,7 @@ threads.Launch(hb, &run); threads.Launch(showStatus, &run); int ncli = 10; - const int64_t nreq = 10; //00 * 100; + const int64_t nreq = 1000 * 100; for (int i = 0; i < 10; ++i) { SyncRequest(i); @@ -397,10 +397,11 @@ } } + Sleep(1s); + run = false; threads.WaitAll(); auto &st = Status(); - Sleep(1s); printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load()); BHCleanup(); printf("after cleanup\n"); -- Gitblit v1.8.0