From 68c7bef33e74f23aa0136ccd6f7faa654d671ebc Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 21 五月 2021 09:23:01 +0800 Subject: [PATCH] center publish notify; fix topic partial match. --- utest/api_test.cpp | 19 +++--- box/center.cpp | 10 -- box/node_center.h | 8 ++ box/node_center.cpp | 127 ++++++++++++++++++++++++++++++++---------- 4 files changed, 114 insertions(+), 50 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index e745be8..3f565b1 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -124,20 +124,14 @@ } else { replyer(MakeReply(eSuccess)); if (clients.empty()) { return; } - - auto it = clients.begin(); - do { - auto &cli = *it; + for (auto &cli : clients) { auto node = cli.weak_node_.lock(); if (node) { // should also make sure that mq is not killed before msg expires. // it would be ok if (kill_time - offline_time) is longer than expire time. socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); - ++it; - } else { - it = clients.erase(it); } - } while (it != clients.end()); + } } }; switch (head.type()) { diff --git a/box/node_center.cpp b/box/node_center.cpp index cefb138..dbf6ee8 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -16,7 +16,22 @@ * ===================================================================================== */ #include "node_center.h" +#include "json.h" #include "log.h" + +using ssjson::Json; + +namespace +{ +std::string Join(const std::string &parent, const std::string &child) +{ + return parent + kTopicSep + child; +} +const std::string kTopicCenterRoot = "#center"; +const std::string kTopicNode = Join(kTopicCenterRoot, "node"); +const std::string kTopicNodeOnline = Join(kTopicNode, "online"); +const std::string kTopicNodeOffline = Join(kTopicNode, "offline"); +} // namespace ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn) { @@ -82,7 +97,7 @@ void MsgRecords::DebugPrint() const { - LOG_DEBUG() << "msgs : " << size(); + LOG_TRACE() << "msgs : " << size(); int i = 0; int total_count = 0; for (auto &kv : msgs_) { @@ -90,26 +105,44 @@ total_count += msg.Count(); LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size(); } - LOG_DEBUG() << "total count: " << total_count; + LOG_TRACE() << "total count: " << total_count; } // NodeCenter::ProcState -void NodeCenter::ProcState::PutOffline(const int64_t offline_time) +void NodeCenter::NodeInfo::PutOffline(const int64_t offline_time) { - timestamp_ = NowSec() - offline_time; - flag_ = kStateOffline; + state_.timestamp_ = NowSec() - offline_time; + state_.flag_ = kStateOffline; + + Json json; + json.put("proc_id", proc_.proc_id()); + center_.Publish(kTopicNodeOffline, json.dump()); } -void NodeCenter::ProcState::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) +void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) { - auto diff = now - timestamp_; - LOG_DEBUG() << "state " << this << " diff: " << diff; + auto old = state_.flag_; + auto diff = now - state_.timestamp_; + auto publish = [this](const std::string &topic) { + if (proc_.proc_id().empty()) { return; } // node init, ignore. + Json json; + json.put("proc_id", proc_.proc_id()); + center_.Publish(topic, json.dump()); + }; + + LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff; if (diff < offline_time) { - flag_ = kStateNormal; + state_.flag_ = kStateNormal; + if (old != state_.flag_) { + publish(kTopicNodeOnline); + } } else if (diff < kill_time) { - flag_ = kStateOffline; + state_.flag_ = kStateOffline; + if (old != state_.flag_) { + publish(kTopicNodeOffline); + } } else { - flag_ = kStateKillme; + state_.flag_ = kStateKillme; } } @@ -126,7 +159,7 @@ auto UpdateRegInfo = [&](Node &node) { node->state_.timestamp_ = NowSec() - offline_time_; - node->state_.UpdateState(NowSec(), offline_time_, kill_time_); + node->UpdateState(NowSec(), offline_time_, kill_time_); // create sockets. try { @@ -149,7 +182,7 @@ SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg); }; - Node node(new NodeInfo); + Node node(new NodeInfo(*this)); if (UpdateRegInfo(node) && PrepareProcInit(node)) { reply |= (node->addrs_[ssn] << 4); nodes_[ssn] = node; @@ -271,33 +304,33 @@ MQId ssn = head.ssn_id(); // when node restart, ssn will change, // and old node will be removed after timeout. - auto UpdateRegInfo = [&](Node &node) { - node->proc_.Swap(msg.mutable_proc()); - node->state_.timestamp_ = head.timestamp(); - node->state_.UpdateState(NowSec(), offline_time_, kill_time_); - }; - auto pos = nodes_.find(ssn); if (pos == nodes_.end()) { return MakeReply(eInvalidInput, "invalid session."); } - // update proc info - Node &node = pos->second; - UpdateRegInfo(node); - LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; - + // try to remove old session auto old = online_node_addr_map_.find(head.proc_id()); if (old != online_node_addr_map_.end()) { // old session auto &old_ssn = old->second; if (old_ssn != ssn) { - nodes_[old_ssn]->state_.PutOffline(offline_time_); + nodes_[old_ssn]->PutOffline(offline_time_); + LOG_DEBUG() << "put node (" << nodes_[old_ssn]->proc_.proc_id() << ") ssn (" << old->second << ") offline"; old_ssn = ssn; } } else { online_node_addr_map_.emplace(head.proc_id(), ssn); } + + // update proc info + Node &node = pos->second; + node->proc_.Swap(msg.mutable_proc()); + node->state_.timestamp_ = head.timestamp(); + node->UpdateState(NowSec(), offline_time_, kill_time_); + + LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; + return MakeReply(eSuccess); } catch (...) { return MakeReply(eError, "register node error."); @@ -309,7 +342,7 @@ return HandleMsg( head, [&](Node node) -> MsgCommonReply { NodeInfo &ni = *node; - ni.state_.PutOffline(offline_time_); + ni.PutOffline(offline_time_); return MakeReply(eSuccess); }); } @@ -338,7 +371,7 @@ return HandleMsg(head, [&](Node node) { NodeInfo &ni = *node; ni.state_.timestamp_ = head.timestamp(); - ni.state_.UpdateState(NowSec(), offline_time_, kill_time_); + ni.UpdateState(NowSec(), offline_time_, kill_time_); auto &info = msg.proc(); if (!info.public_info().empty()) { @@ -469,8 +502,8 @@ NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic) { Clients dests; - auto Find1 = [&](const std::string &t) { - auto pos = subscribe_map_.find(topic); + auto Find1 = [&](const std::string &exact) { + auto pos = subscribe_map_.find(exact); if (pos != subscribe_map_.end()) { auto &clients = pos->second; for (auto &cli : clients) { @@ -489,7 +522,7 @@ // Find1(std::string()); // sub all. break; } else { - Find1(topic.substr(0, pos)); + Find1(topic.substr(0, pos - 1)); } } return dests; @@ -521,7 +554,7 @@ auto it = nodes_.begin(); while (it != nodes_.end()) { auto &cli = *it->second; - cli.state_.UpdateState(now, offline_time_, kill_time_); + cli.UpdateState(now, offline_time_, kill_time_); if (cli.state_.flag_ == kStateKillme) { RemoveNode(it->second); it = nodes_.erase(it); @@ -564,4 +597,36 @@ } node->addrs_.clear(); +} + +void NodeCenter::Publish(const Topic &topic, const std::string &content) +{ + try { + // LOG_DEBUG() << "center publish: " << topic << ": " << content; + Clients clients(DoFindClients(topic)); + if (clients.empty()) { return; } + + MsgPublish pub; + pub.set_topic(topic); + pub.set_data(content); + BHMsgHead head(InitMsgHead(GetType(pub), id(), 0)); + MsgI msg; + if (msg.Make(head, pub)) { + DEFER1(msg.Release()); + RecordMsg(msg); + + auto &mq = GetCenterInfo(BHomeShm())->mq_sender_; + ShmSocket sender(mq.offset_, BHomeShm(), mq.id_); + + for (auto &cli : clients) { + auto node = cli.weak_node_.lock(); + if (node && node->state_.flag_ == kStateNormal) { + sender.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); + } + } + } + + } catch (...) { + LOG_ERROR() << "center publish error."; + } } \ No newline at end of file diff --git a/box/node_center.h b/box/node_center.h index 4663bee..4d3fba3 100644 --- a/box/node_center.h +++ b/box/node_center.h @@ -80,17 +80,20 @@ struct ProcState { int64_t timestamp_ = 0; uint32_t flag_ = 0; // reserved - void PutOffline(const int64_t offline_time); - void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); }; typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; struct NodeInfo { + NodeCenter ¢er_; ProcState state_; // state std::map<MQId, int64_t> addrs_; // registered mqs ProcInfo proc_; // AddressTopics services_; // address: topics AddressTopics subscriptions_; // address: topics + NodeInfo(NodeCenter ¢er) : + center_(center) {} + void PutOffline(const int64_t offline_time); + void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); }; typedef std::shared_ptr<NodeInfo> Node; typedef std::weak_ptr<NodeInfo> WeakNode; @@ -171,6 +174,7 @@ private: void CheckNodes(); bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } + void Publish(const Topic &topic, const std::string &content); bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } bool Valid(const WeakNode &weak) { diff --git a/utest/api_test.cpp b/utest/api_test.cpp index e278e29..dc3efb6 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -60,7 +60,7 @@ std::string proc((const char *) proc_id, proc_id_len); MsgPublish pub; pub.ParseFromArray(data, data_len); - printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str()); + printf("****************************************** Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str()); } void ServerProc(const void *proc_id, @@ -120,10 +120,11 @@ // BHCleanup(); // return; + const std::string proc_id = "demo_client"; bool reg = false; for (int i = 0; i < 3 && !reg; ++i) { ProcInfo proc; - proc.set_proc_id("demo_client"); + proc.set_proc_id(proc_id); proc.set_public_info("public info of demo_client. etc..."); std::string proc_buf(proc.SerializeAsString()); void *reply = 0; @@ -148,13 +149,13 @@ } BHFree(reply, reply_len); - Sleep(1s); + // Sleep(1s); } if (!reg) { return; } - const std::string topic_ = "topic_"; + const std::string topic_ = proc_id + "_topic_"; { // Server Register Topics MsgTopicList topics; @@ -214,7 +215,7 @@ void *reply = 0; int reply_len = 0; DEFER1(BHFree(reply, reply_len)); - bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 100); + bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 1000); if (!r) { int ec = 0; std::string msg; @@ -232,10 +233,10 @@ // } } } - // return; { // Subscribe MsgTopicList topics; + topics.add_topic_list("#center.node"); for (int i = 0; i < 10; ++i) { topics.add_topic_list(topic_ + std::to_string(i * 2)); } @@ -285,7 +286,7 @@ void *reply = 0; int reply_len = 0; DEFER1(BHFree(reply, reply_len)); - bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 100); + bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 1000); if (!r) { int ec = 0; std::string msg; @@ -361,7 +362,7 @@ ThreadManager threads; -#if 0 +#if 1 BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); #else BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc); @@ -372,7 +373,7 @@ threads.Launch(hb, &run); threads.Launch(showStatus, &run); int ncli = 10; - const int64_t nreq = 1000 * 100; + const int64_t nreq = 10; //00 * 100; for (int i = 0; i < 10; ++i) { SyncRequest(i); -- Gitblit v1.8.0