From c1e39e20ca42b21eeac8b5068fa1f921bf9a070f Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 23 六月 2021 19:43:29 +0800
Subject: [PATCH] refactor, start tcp pub/sub.
---
box/node_center.cpp | 294 +++++++++++++++++++++++++++++++++++++++++++++-------------
1 files changed, 228 insertions(+), 66 deletions(-)
diff --git a/box/node_center.cpp b/box/node_center.cpp
index 4e228a7..c32b197 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -57,7 +57,7 @@
{
auto pos = msgs_.find(id);
if (pos != msgs_.end()) {
- ShmMsg(pos->second).Free();
+ pos->second.Free();
msgs_.erase(pos);
} else {
LOG_TRACE() << "ignore late free request.";
@@ -70,8 +70,9 @@
return;
}
// LOG_FUNCTION;
+ const size_t total = msgs_.size();
time_to_clean_ = now + 1;
- int64_t limit = std::max(10000ul, msgs_.size() / 10);
+ int64_t limit = std::max(10000ul, total / 10);
int64_t n = 0;
auto it = msgs_.begin();
while (it != msgs_.end() && --limit > 0) {
@@ -82,16 +83,16 @@
++n;
};
int n = now - msg.timestamp();
- if (n < 10) {
+ if (msg.Count() == 0) {
+ Free();
+ } else if (n > NodeTimeoutSec()) {
+ Free();
+ } else {
++it;
- } else if (msg.Count() == 0) {
- Free();
- } else if (n > 60) {
- Free();
}
}
if (n > 0) {
- LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
+ LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total;
}
}
@@ -101,9 +102,9 @@
int i = 0;
int total_count = 0;
for (auto &kv : msgs_) {
- MsgI msg(kv.second);
+ auto &msg = kv.second;
total_count += msg.Count();
- LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size();
+ LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size();
}
LOG_TRACE() << "total count: " << total_count;
}
@@ -163,7 +164,7 @@
// create sockets.
try {
- ShmSocket tmp(shm, true, ssn, 16);
+ ShmSocket tmp(shm, ssn, eCreate);
node->addrs_.emplace(ssn, tmp.AbsAddr());
return true;
} catch (...) {
@@ -173,7 +174,7 @@
auto PrepareProcInit = [&](Node &node) {
bool r = false;
- ShmMsg init_msg;
+ ShmMsg init_msg(shm);
DEFER1(init_msg.Release());
MsgProcInit body;
auto head = InitMsgHead(GetType(body), id(), ssn);
@@ -210,6 +211,119 @@
return socket.Send(dest, msg);
}
+NodeCenter::Node NodeCenter::GetNode(const MQId mq_id)
+{
+ Node node;
+ auto ssn = mq_id - (mq_id % 10);
+ auto pos = nodes_.find(ssn);
+ if (pos != nodes_.end()) {
+ node = pos->second;
+ }
+ return node;
+}
+
+bool NodeCenter::PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+{
+ Node node;
+
+ auto FindDest = [&]() {
+ auto pos = service_map_.find(head.topic());
+ if (pos != service_map_.end() && !pos->second.empty()) {
+ auto &clients = pos->second;
+ for (auto &cli : clients) {
+ node = cli.weak_node_.lock();
+ if (node && Valid(*node)) {
+ dest.id_ = cli.mq_id_;
+ dest.offset_ = cli.mq_abs_addr_;
+ return true;
+ }
+ }
+ }
+ return false;
+ };
+
+ if (dest.id_ == 0) {
+ if (!FindDest()) {
+ LOG_ERROR() << id() << " pass remote request, topic dest not found.";
+ return false;
+ }
+ } else {
+ node = GetNode(dest.id_);
+ if (!node || !Valid(*node)) {
+ LOG_ERROR() << id() << " pass remote request, dest not found.";
+ return false;
+ }
+ }
+
+ ShmSocket &sender(DefaultSender(node->shm_));
+ auto route = head.add_route();
+ route->set_mq_id(sender.id());
+ route->set_abs_addr(sender.AbsAddr());
+
+ ShmMsg msg(node->shm_);
+ if (!msg.Make(head, body_content)) { return false; }
+ DEFER1(msg.Release(););
+ RecordMsg(msg);
+ return sender.Send(dest, msg, head.msg_id(), std::move(cb));
+}
+
+bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content)
+{
+ auto &topic = head.topic();
+ auto clients = DoFindClients(topic, true);
+ if (clients.empty()) { return true; }
+
+ std::vector<MsgI> msgs;
+ auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } };
+ DEFER1(ReleaseAll(););
+
+ for (auto &cli : clients) {
+ auto Send1 = [&](Node node) {
+ auto &shm = node->shm_;
+ for (auto &msg : msgs) {
+ if (msg.shm().name() == shm.name()) {
+ DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
+ return;
+ }
+ }
+ MsgI msg(shm);
+ if (msg.Make(body_content)) {
+ RecordMsg(msg);
+ msgs.push_back(msg);
+ DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
+ }
+ };
+ auto node = cli.weak_node_.lock();
+ if (node) {
+ Send1(node);
+ // should also make sure that mq is not killed before msg expires.
+ // it would be ok if (kill_time - offline_time) is longer than expire time.
+ }
+ }
+
+ return true;
+}
+
+bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
+{
+ Node node(GetNode(dest.id_));
+ if (!node) {
+ LOG_ERROR() << id() << " pass remote reply , ssn not found.";
+ return false;
+ }
+ auto offset = node->addrs_[dest.id_];
+ if (offset != dest.offset_) {
+ LOG_ERROR() << id() << " pass remote reply, dest address not match";
+ return false;
+ }
+
+ ShmMsg msg(node->shm_);
+ if (!msg.Make(head, body_content)) { return false; }
+ DEFER1(msg.Release(););
+ RecordMsg(msg);
+ return DefaultSender(node->shm_).Send(dest, msg);
+}
+
void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
{
// LOG_FUNCTION;
@@ -238,7 +352,7 @@
if (!FindMq()) { return; }
auto size = GetAllocSize((val >> 52) & MaskBits(8));
- MsgI new_msg;
+ MsgI new_msg(socket.shm());
if (new_msg.Make(size)) {
// 31bit proc index, 28bit id, ,4bit cmd+flag
int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0);
@@ -281,7 +395,7 @@
auto &node = pos->second;
try {
for (int i = 0; i < msg.extra_mq_num(); ++i) {
- ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16);
+ ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate);
node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
auto addr = reply.add_extra_mqs();
addr->set_mq_id(tmp.id());
@@ -392,11 +506,16 @@
*info->mutable_proc() = node->proc_;
info->mutable_proc()->clear_private_info();
info->set_online(node->state_.flag_ == kStateNormal);
- for (auto &addr_topics : node->services_) {
- for (auto &topic : addr_topics.second) {
- info->mutable_topics()->add_topic_list(topic);
+ auto AddTopics = [](auto &dst, auto &src) {
+ for (auto &addr_topics : src) {
+ for (auto &topic : addr_topics.second) {
+ dst.add_topic_list(topic);
+ }
}
- }
+ };
+ AddTopics(*info->mutable_service(), node->services_);
+ AddTopics(*info->mutable_local_sub(), node->local_sub_);
+ AddTopics(*info->mutable_net_sub(), node->net_sub_);
};
if (!proc_id.empty()) {
@@ -455,35 +574,50 @@
return HandleMsg<Reply>(head, query);
}
+void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node)
+{
+ auto src = SrcAddr(head);
+ auto Sub = [&](auto &sub, auto &sub_map) {
+ auto &topics = msg.topics().topic_list();
+ sub[src].insert(topics.begin(), topics.end());
+ const TopicDest &dest = {src, SrcAbsAddr(head), node};
+ for (auto &topic : topics) {
+ sub_map[topic].insert(dest);
+ }
+ };
+ LOG_DEBUG() << "subscribe net : " << msg.network();
+ if (msg.network()) {
+ Sub(net_sub_, center_.net_sub_map_);
+ } else {
+ Sub(local_sub_, center_.local_sub_map_);
+ }
+}
+
MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
{
return HandleMsg(head, [&](Node node) {
- auto src = SrcAddr(head);
- auto &topics = msg.topics().topic_list();
- node->subscriptions_[src].insert(topics.begin(), topics.end());
- TopicDest dest = {src, SrcAbsAddr(head), node};
- for (auto &topic : topics) {
- subscribe_map_[topic].insert(dest);
- }
+ node->Subscribe(head, msg, node);
return MakeReply(eSuccess);
});
}
-MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
-{
- return HandleMsg(head, [&](Node node) {
- auto src = SrcAddr(head);
- auto pos = node->subscriptions_.find(src);
- auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) {
- auto pos = subscribe_map_.find(topic);
- if (pos != subscribe_map_.end() &&
+void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node)
+{
+ auto src = SrcAddr(head);
+
+ auto Unsub = [&](auto &sub, auto &sub_map) {
+ auto pos = sub.find(src);
+
+ auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) {
+ auto pos = sub_map.find(topic);
+ if (pos != sub_map.end() &&
pos->second.erase(dest) != 0 &&
pos->second.empty()) {
- subscribe_map_.erase(pos);
+ sub_map.erase(pos);
}
};
- if (pos != node->subscriptions_.end()) {
+ if (pos != sub.end()) {
const TopicDest &dest = {src, SrcAbsAddr(head), node};
auto &topics = msg.topics().topic_list();
// clear node sub records;
@@ -492,26 +626,44 @@
RemoveSubTopicDestRecord(topic, dest);
}
if (pos->second.empty()) {
- node->subscriptions_.erase(pos);
+ sub.erase(pos);
}
}
+ };
+ if (msg.network()) {
+ Unsub(net_sub_, center_.net_sub_map_);
+ } else {
+ Unsub(local_sub_, center_.local_sub_map_);
+ }
+}
+
+MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
+{
+ return HandleMsg(head, [&](Node node) {
+ node->Unsubscribe(head, msg, node);
return MakeReply(eSuccess);
});
}
-NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic)
+NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote)
{
Clients dests;
auto Find1 = [&](const std::string &exact) {
- auto pos = subscribe_map_.find(exact);
- if (pos != subscribe_map_.end()) {
- auto &clients = pos->second;
- for (auto &cli : clients) {
- if (Valid(cli.weak_node_)) {
- dests.insert(cli);
+ auto FindIn = [&](auto &sub_map) {
+ auto pos = sub_map.find(exact);
+ if (pos != sub_map.end()) {
+ auto &clients = pos->second;
+ for (auto &cli : clients) {
+ if (Valid(cli.weak_node_)) {
+ dests.insert(cli);
+ }
}
}
+ };
+ if (!from_remote) {
+ FindIn(local_sub_map_);
}
+ FindIn(net_sub_map_);
};
Find1(topic);
@@ -528,15 +680,31 @@
return dests;
}
-bool NodeCenter::FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply)
+MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg)
{
- bool ret = false;
- HandleMsg(head, [&](Node node) {
- DoFindClients(msg.topic()).swap(out);
- ret = true;
+ return HandleMsg(head, [&](Node node) {
+ DoPublish(DefaultSender(node->shm_), topic, msg);
return MakeReply(eSuccess);
- }).Swap(&reply);
- return ret;
+ });
+}
+
+void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg)
+{
+ try {
+ auto clients = DoFindClients(topic, false);
+ if (clients.empty()) { return; }
+
+ for (auto &cli : clients) {
+ auto node = cli.weak_node_.lock();
+ if (node) {
+ // should also make sure that mq is not killed before msg expires.
+ // it would be ok if (kill_time - offline_time) is longer than expire time.
+ sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
+ }
+ }
+ } catch (...) {
+ LOG_ERROR() << "DoPublish error.";
+ }
}
void NodeCenter::OnTimer()
@@ -582,7 +750,8 @@
}
};
EraseMapRec(service_map_, node->services_);
- EraseMapRec(subscribe_map_, node->subscriptions_);
+ EraseMapRec(local_sub_map_, node->local_sub_);
+ EraseMapRec(net_sub_map_, node->net_sub_);
// remove online record.
auto pos = online_node_addr_map_.find(node->proc_.proc_id());
@@ -604,31 +773,24 @@
void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content)
{
try {
- // LOG_DEBUG() << "center publish: " << topic << ": " << content;
- Clients clients(DoFindClients(topic));
- if (clients.empty()) { return; }
-
MsgPublish pub;
pub.set_topic(topic);
pub.set_data(content);
BHMsgHead head(InitMsgHead(GetType(pub), id(), 0));
- MsgI msg;
+ MsgI msg(shm);
if (msg.Make(head, pub)) {
DEFER1(msg.Release());
RecordMsg(msg);
-
- auto &mq = GetCenterInfo(shm)->mq_sender_;
- ShmSocket sender(mq.offset_, shm, mq.id_);
-
- for (auto &cli : clients) {
- auto node = cli.weak_node_.lock();
- if (node && node->state_.flag_ == kStateNormal) {
- sender.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
- }
- }
+ DoPublish(DefaultSender(shm), topic, msg);
}
} catch (...) {
LOG_ERROR() << "center publish error.";
}
+}
+
+std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic)
+{
+ //TODO search synced full list;
+ return std::vector<std::string>();
}
\ No newline at end of file
--
Gitblit v1.8.0