From 5b6ced44157b6e7fab519ae48f5cffcdc2b3cd7c Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期日, 25 四月 2021 19:28:57 +0800 Subject: [PATCH] use node mqid ssn id to index online nodes. --- utest/speed_test.cpp | 6 +- src/proto.cpp | 7 ++- box/center.cpp | 42 ++++++++++++++------- src/proto.h | 4 +- proto/source/bhome_msg.proto | 5 +- src/topic_node.h | 1 src/topic_node.cpp | 22 +++++----- 7 files changed, 52 insertions(+), 35 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index d920ff7..4bb9ea1 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -52,6 +52,11 @@ struct ProcState { int64_t timestamp_ = 0; uint32_t flag_ = 0; // reserved + void PutOffline(const int64_t offline_time) + { + timestamp_ = NowSec() - offline_time; + flag_ = kStateOffline; + } void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) { auto diff = now - timestamp_; @@ -106,6 +111,10 @@ } try { + MQId ssn = head.ssn_id(); + // use src_addr as session id. + // when node restart, src_addr will change, + // and old node will be removed after timeout. auto UpdateRegInfo = [&](Node &node) { node->addrs_.insert(SrcAddr(head)); for (auto &addr : msg.addrs()) { @@ -116,19 +125,24 @@ node->state_.UpdateState(NowSec(), offline_time_, kill_time_); }; - auto pos = nodes_.find(head.proc_id()); - if (pos != nodes_.end()) { // new client + auto pos = nodes_.find(ssn); + if (pos != nodes_.end()) { // update Node &node = pos->second; - if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) { - // node restarted, release old mq. - RemoveNode(node); - node.reset(new NodeInfo); - } UpdateRegInfo(node); } else { Node node(new NodeInfo); UpdateRegInfo(node); - nodes_[node->proc_.proc_id()] = node; + nodes_[ssn] = node; + + auto old = node_addr_map_.find(head.proc_id()); + if (old != node_addr_map_.end()) { // old session + auto &old_ssn = old->second; + nodes_[old_ssn]->state_.PutOffline(offline_time_); + printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second); + old_ssn = ssn; + } else { + node_addr_map_.emplace(head.proc_id(), ssn); + } } return MakeReply(eSuccess); } catch (...) { @@ -140,7 +154,7 @@ Reply HandleMsg(const BHMsgHead &head, Func const &op) { try { - auto pos = nodes_.find(head.proc_id()); + auto pos = nodes_.find(head.ssn_id()); if (pos == nodes_.end()) { return MakeReply<Reply>(eNotRegistered, "Node is not registered."); } else { @@ -171,9 +185,7 @@ return HandleMsg( head, [&](Node node) -> MsgCommonReply { NodeInfo &ni = *node; - auto now = NowSec(); // just set to offline. - ni.state_.timestamp_ = now - offline_time_; - ni.state_.UpdateState(now, offline_time_, kill_time_); + ni.state_.PutOffline(offline_time_); return MakeReply(eSuccess); }); } @@ -375,6 +387,7 @@ }; EraseMapRec(service_map_, node->services_); EraseMapRec(subscribe_map_, node->subscriptions_); + node_addr_map_.erase(node->proc_.proc_id()); for (auto &addr : node->addrs_) { cleaner_(addr); @@ -385,7 +398,8 @@ std::unordered_map<Topic, Clients> service_map_; std::unordered_map<Topic, Clients> subscribe_map_; - std::unordered_map<ProcId, Node> nodes_; + std::unordered_map<Address, Node> nodes_; + std::unordered_map<std::string, Address> node_addr_map_; Cleaner cleaner_; // remove mqs. int64_t offline_time_; int64_t kill_time_; @@ -425,7 +439,7 @@ auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2); auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { return [&](auto &&rep_body) { - auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); + auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); auto remote = head.route(0).mq_id(); socket.Send(remote, reply_head, rep_body); }; diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto index 6a4942d..51e9b6e 100644 --- a/proto/source/bhome_msg.proto +++ b/proto/source/bhome_msg.proto @@ -16,8 +16,9 @@ repeated BHAddress route = 2; // for reply and proxy. int64 timestamp = 3; int32 type = 4; - bytes proc_id = 5; - bytes topic = 6; // for request route + uint64 ssn_id = 5; // node mq id + bytes proc_id = 6; + bytes topic = 7; // for request route } diff --git a/src/proto.cpp b/src/proto.cpp index b1e8207..c8a5052 100644 --- a/src/proto.cpp +++ b/src/proto.cpp @@ -32,17 +32,18 @@ std::string NewMsgId() { return RandId(); } -BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id) +BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id) { - return InitMsgHead(type, proc_id, RandId()); + return InitMsgHead(type, proc_id, ssn_id, RandId()); } -BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid) +BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id, const std::string &msgid) { BHMsgHead msg; msg.set_msg_id(msgid); msg.set_type(type); msg.set_proc_id(proc_id); + msg.set_ssn_id(ssn_id); msg.set_timestamp(NowSec()); return msg; } diff --git a/src/proto.h b/src/proto.h index c30b4fd..94a438c 100644 --- a/src/proto.h +++ b/src/proto.h @@ -74,8 +74,8 @@ return msg; } std::string NewMsgId(); -BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid); -BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id); +BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id, const std::string &msgid); +BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id); // inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } inline bool IsSuccess(const ErrorCode ec) { return ec == eSuccess; } bool IsMsgExpired(const BHMsgHead &head); diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 9398318..13bb8ee 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -89,7 +89,7 @@ AddId(SockSub().id()); AddId(SockPub().id()); - auto head(InitMsgHead(GetType(body), body.proc().proc_id())); + auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); AddRoute(head, sock.id()); auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { @@ -129,7 +129,7 @@ MsgUnregister body; body.mutable_proc()->Swap(&proc); - auto head(InitMsgHead(GetType(body), body.proc().proc_id())); + auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); AddRoute(head, sock.id()); auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { @@ -165,7 +165,7 @@ MsgHeartbeat body; body.mutable_proc()->Swap(&proc); - auto head(InitMsgHead(GetType(body), body.proc().proc_id())); + auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); AddRoute(head, sock.id()); if (timeout_ms == 0) { @@ -195,7 +195,7 @@ } auto &sock = SockNode(); - BHMsgHead head(InitMsgHead(GetType(query), proc_id())); + BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); AddRoute(head, sock.id()); MsgI reply; @@ -217,7 +217,7 @@ MsgRegisterRPC body; body.mutable_topics()->Swap(&topics); - auto head(InitMsgHead(GetType(body), proc_id())); + auto head(InitMsgHead(GetType(body), proc_id(), ssn())); AddRoute(head, sock.id()); if (timeout_ms == 0) { @@ -242,7 +242,7 @@ MsgRequestTopicReply reply_body; if (rcb(head.proc_id(), req, reply_body)) { - BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); + BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), ssn(), head.msg_id())); for (int i = 0; i < head.route_size() - 1; ++i) { reply_head.add_route()->Swap(head.mutable_route(i)); @@ -311,7 +311,7 @@ if (!p || p->route.empty()) { return false; } - BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id)); + BHMsgHead head(InitMsgHead(GetType(body), proc_id(), ssn(), p->msg_id)); for (unsigned i = 0; i < p->route.size() - 1; ++i) { head.add_route()->Swap(&p->route[i]); } @@ -348,7 +348,7 @@ auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) { auto &sock = SockClient(); - BHMsgHead head(InitMsgHead(GetType(req), proc_id(), msg_id)); + BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id)); AddRoute(head, sock.id()); head.set_topic(req.topic()); @@ -388,7 +388,7 @@ BHAddress addr; if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { - BHMsgHead head(InitMsgHead(GetType(request), proc_id())); + BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); AddRoute(head, sock.id()); head.set_topic(request.topic()); @@ -460,7 +460,7 @@ try { auto &sock = SockPub(); - BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); + BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn())); AddRoute(head, sock.id()); if (timeout_ms == 0) { @@ -494,7 +494,7 @@ MsgSubscribe sub; sub.mutable_topics()->Swap(&topics); - BHMsgHead head(InitMsgHead(GetType(sub), proc_id())); + BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn())); AddRoute(head, sock.id()); if (timeout_ms == 0) { return sock.Send(BHTopicBusAddress(), head, sub); diff --git a/src/topic_node.h b/src/topic_node.h index 76bd608..3c90e5b 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -74,6 +74,7 @@ void Stop(); private: + MQId ssn() { return SockNode().id(); } bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms); typedef MsgQueryTopicReply::BHNodeAddress NodeAddress; int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms); diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index d145ab4..302d4bd 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -38,7 +38,7 @@ MsgRequestTopic body; body.set_topic("topic"); body.set_data(str); - auto head(InitMsgHead(GetType(body), proc_id)); + auto head(InitMsgHead(GetType(body), proc_id, mq.Id())); msg.Make(head, body); assert(msg.valid()); DEFER1(msg.Release();); @@ -156,7 +156,7 @@ MsgRequestTopic req_body; req_body.set_topic("topic"); req_body.set_data(msg_content); - auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); + auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id())); req_head.add_route()->set_mq_id(cli.id()); return cli.Send(srv.id(), req_head, req_body); }; @@ -180,7 +180,7 @@ MsgRequestTopic reply_body; reply_body.set_topic("topic"); reply_body.set_data(msg_content); - auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id())); + auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id())); return srv.Send(src_id, reply_head, reply_body); }; Reply(); -- Gitblit v1.8.0