From 5b6ced44157b6e7fab519ae48f5cffcdc2b3cd7c Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期日, 25 四月 2021 19:28:57 +0800
Subject: [PATCH] use node mqid ssn id to index online nodes.
---
utest/speed_test.cpp | 6 +-
src/proto.cpp | 7 ++-
box/center.cpp | 42 ++++++++++++++-------
src/proto.h | 4 +-
proto/source/bhome_msg.proto | 5 +-
src/topic_node.h | 1
src/topic_node.cpp | 22 +++++-----
7 files changed, 52 insertions(+), 35 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index d920ff7..4bb9ea1 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -52,6 +52,11 @@
struct ProcState {
int64_t timestamp_ = 0;
uint32_t flag_ = 0; // reserved
+ void PutOffline(const int64_t offline_time)
+ {
+ timestamp_ = NowSec() - offline_time;
+ flag_ = kStateOffline;
+ }
void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
{
auto diff = now - timestamp_;
@@ -106,6 +111,10 @@
}
try {
+ MQId ssn = head.ssn_id();
+ // use src_addr as session id.
+ // when node restart, src_addr will change,
+ // and old node will be removed after timeout.
auto UpdateRegInfo = [&](Node &node) {
node->addrs_.insert(SrcAddr(head));
for (auto &addr : msg.addrs()) {
@@ -116,19 +125,24 @@
node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
};
- auto pos = nodes_.find(head.proc_id());
- if (pos != nodes_.end()) { // new client
+ auto pos = nodes_.find(ssn);
+ if (pos != nodes_.end()) { // update
Node &node = pos->second;
- if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) {
- // node restarted, release old mq.
- RemoveNode(node);
- node.reset(new NodeInfo);
- }
UpdateRegInfo(node);
} else {
Node node(new NodeInfo);
UpdateRegInfo(node);
- nodes_[node->proc_.proc_id()] = node;
+ nodes_[ssn] = node;
+
+ auto old = node_addr_map_.find(head.proc_id());
+ if (old != node_addr_map_.end()) { // old session
+ auto &old_ssn = old->second;
+ nodes_[old_ssn]->state_.PutOffline(offline_time_);
+ printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
+ old_ssn = ssn;
+ } else {
+ node_addr_map_.emplace(head.proc_id(), ssn);
+ }
}
return MakeReply(eSuccess);
} catch (...) {
@@ -140,7 +154,7 @@
Reply HandleMsg(const BHMsgHead &head, Func const &op)
{
try {
- auto pos = nodes_.find(head.proc_id());
+ auto pos = nodes_.find(head.ssn_id());
if (pos == nodes_.end()) {
return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
} else {
@@ -171,9 +185,7 @@
return HandleMsg(
head, [&](Node node) -> MsgCommonReply {
NodeInfo &ni = *node;
- auto now = NowSec(); // just set to offline.
- ni.state_.timestamp_ = now - offline_time_;
- ni.state_.UpdateState(now, offline_time_, kill_time_);
+ ni.state_.PutOffline(offline_time_);
return MakeReply(eSuccess);
});
}
@@ -375,6 +387,7 @@
};
EraseMapRec(service_map_, node->services_);
EraseMapRec(subscribe_map_, node->subscriptions_);
+ node_addr_map_.erase(node->proc_.proc_id());
for (auto &addr : node->addrs_) {
cleaner_(addr);
@@ -385,7 +398,8 @@
std::unordered_map<Topic, Clients> service_map_;
std::unordered_map<Topic, Clients> subscribe_map_;
- std::unordered_map<ProcId, Node> nodes_;
+ std::unordered_map<Address, Node> nodes_;
+ std::unordered_map<std::string, Address> node_addr_map_;
Cleaner cleaner_; // remove mqs.
int64_t offline_time_;
int64_t kill_time_;
@@ -425,7 +439,7 @@
auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2);
auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
return [&](auto &&rep_body) {
- auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
+ auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
auto remote = head.route(0).mq_id();
socket.Send(remote, reply_head, rep_body);
};
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index 6a4942d..51e9b6e 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -16,8 +16,9 @@
repeated BHAddress route = 2; // for reply and proxy.
int64 timestamp = 3;
int32 type = 4;
- bytes proc_id = 5;
- bytes topic = 6; // for request route
+ uint64 ssn_id = 5; // node mq id
+ bytes proc_id = 6;
+ bytes topic = 7; // for request route
}
diff --git a/src/proto.cpp b/src/proto.cpp
index b1e8207..c8a5052 100644
--- a/src/proto.cpp
+++ b/src/proto.cpp
@@ -32,17 +32,18 @@
std::string NewMsgId() { return RandId(); }
-BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id)
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id)
{
- return InitMsgHead(type, proc_id, RandId());
+ return InitMsgHead(type, proc_id, ssn_id, RandId());
}
-BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid)
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id, const std::string &msgid)
{
BHMsgHead msg;
msg.set_msg_id(msgid);
msg.set_type(type);
msg.set_proc_id(proc_id);
+ msg.set_ssn_id(ssn_id);
msg.set_timestamp(NowSec());
return msg;
}
diff --git a/src/proto.h b/src/proto.h
index c30b4fd..94a438c 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -74,8 +74,8 @@
return msg;
}
std::string NewMsgId();
-BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid);
-BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id);
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id, const std::string &msgid);
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id);
// inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
inline bool IsSuccess(const ErrorCode ec) { return ec == eSuccess; }
bool IsMsgExpired(const BHMsgHead &head);
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 9398318..13bb8ee 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -89,7 +89,7 @@
AddId(SockSub().id());
AddId(SockPub().id());
- auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
+ auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
AddRoute(head, sock.id());
auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
@@ -129,7 +129,7 @@
MsgUnregister body;
body.mutable_proc()->Swap(&proc);
- auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
+ auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
AddRoute(head, sock.id());
auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
@@ -165,7 +165,7 @@
MsgHeartbeat body;
body.mutable_proc()->Swap(&proc);
- auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
+ auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
AddRoute(head, sock.id());
if (timeout_ms == 0) {
@@ -195,7 +195,7 @@
}
auto &sock = SockNode();
- BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
+ BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn()));
AddRoute(head, sock.id());
MsgI reply;
@@ -217,7 +217,7 @@
MsgRegisterRPC body;
body.mutable_topics()->Swap(&topics);
- auto head(InitMsgHead(GetType(body), proc_id()));
+ auto head(InitMsgHead(GetType(body), proc_id(), ssn()));
AddRoute(head, sock.id());
if (timeout_ms == 0) {
@@ -242,7 +242,7 @@
MsgRequestTopicReply reply_body;
if (rcb(head.proc_id(), req, reply_body)) {
- BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id()));
+ BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), ssn(), head.msg_id()));
for (int i = 0; i < head.route_size() - 1; ++i) {
reply_head.add_route()->Swap(head.mutable_route(i));
@@ -311,7 +311,7 @@
if (!p || p->route.empty()) {
return false;
}
- BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id));
+ BHMsgHead head(InitMsgHead(GetType(body), proc_id(), ssn(), p->msg_id));
for (unsigned i = 0; i < p->route.size() - 1; ++i) {
head.add_route()->Swap(&p->route[i]);
}
@@ -348,7 +348,7 @@
auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) {
auto &sock = SockClient();
- BHMsgHead head(InitMsgHead(GetType(req), proc_id(), msg_id));
+ BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id));
AddRoute(head, sock.id());
head.set_topic(req.topic());
@@ -388,7 +388,7 @@
BHAddress addr;
if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
- BHMsgHead head(InitMsgHead(GetType(request), proc_id()));
+ BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
AddRoute(head, sock.id());
head.set_topic(request.topic());
@@ -460,7 +460,7 @@
try {
auto &sock = SockPub();
- BHMsgHead head(InitMsgHead(GetType(pub), proc_id()));
+ BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn()));
AddRoute(head, sock.id());
if (timeout_ms == 0) {
@@ -494,7 +494,7 @@
MsgSubscribe sub;
sub.mutable_topics()->Swap(&topics);
- BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
+ BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
AddRoute(head, sock.id());
if (timeout_ms == 0) {
return sock.Send(BHTopicBusAddress(), head, sub);
diff --git a/src/topic_node.h b/src/topic_node.h
index 76bd608..3c90e5b 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -74,6 +74,7 @@
void Stop();
private:
+ MQId ssn() { return SockNode().id(); }
bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms);
typedef MsgQueryTopicReply::BHNodeAddress NodeAddress;
int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms);
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index d145ab4..302d4bd 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -38,7 +38,7 @@
MsgRequestTopic body;
body.set_topic("topic");
body.set_data(str);
- auto head(InitMsgHead(GetType(body), proc_id));
+ auto head(InitMsgHead(GetType(body), proc_id, mq.Id()));
msg.Make(head, body);
assert(msg.valid());
DEFER1(msg.Release(););
@@ -156,7 +156,7 @@
MsgRequestTopic req_body;
req_body.set_topic("topic");
req_body.set_data(msg_content);
- auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
+ auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id()));
req_head.add_route()->set_mq_id(cli.id());
return cli.Send(srv.id(), req_head, req_body);
};
@@ -180,7 +180,7 @@
MsgRequestTopic reply_body;
reply_body.set_topic("topic");
reply_body.set_data(msg_content);
- auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
+ auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
return srv.Send(src_id, reply_head, reply_body);
};
Reply();
--
Gitblit v1.8.0