From c1e39e20ca42b21eeac8b5068fa1f921bf9a070f Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 23 六月 2021 19:43:29 +0800 Subject: [PATCH] refactor, start tcp pub/sub. --- box/node_center.cpp | 184 ++++++++++++++++++++++++++++++++++------------ 1 files changed, 136 insertions(+), 48 deletions(-) diff --git a/box/node_center.cpp b/box/node_center.cpp index 5c24409..c32b197 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -267,6 +267,43 @@ return sender.Send(dest, msg, head.msg_id(), std::move(cb)); } +bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content) +{ + auto &topic = head.topic(); + auto clients = DoFindClients(topic, true); + if (clients.empty()) { return true; } + + std::vector<MsgI> msgs; + auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } }; + DEFER1(ReleaseAll();); + + for (auto &cli : clients) { + auto Send1 = [&](Node node) { + auto &shm = node->shm_; + for (auto &msg : msgs) { + if (msg.shm().name() == shm.name()) { + DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); + return; + } + } + MsgI msg(shm); + if (msg.Make(body_content)) { + RecordMsg(msg); + msgs.push_back(msg); + DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); + } + }; + auto node = cli.weak_node_.lock(); + if (node) { + Send1(node); + // should also make sure that mq is not killed before msg expires. + // it would be ok if (kill_time - offline_time) is longer than expire time. + } + } + + return true; +} + bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) { Node node(GetNode(dest.id_)); @@ -469,11 +506,16 @@ *info->mutable_proc() = node->proc_; info->mutable_proc()->clear_private_info(); info->set_online(node->state_.flag_ == kStateNormal); - for (auto &addr_topics : node->services_) { - for (auto &topic : addr_topics.second) { - info->mutable_topics()->add_topic_list(topic); + auto AddTopics = [](auto &dst, auto &src) { + for (auto &addr_topics : src) { + for (auto &topic : addr_topics.second) { + dst.add_topic_list(topic); + } } - } + }; + AddTopics(*info->mutable_service(), node->services_); + AddTopics(*info->mutable_local_sub(), node->local_sub_); + AddTopics(*info->mutable_net_sub(), node->net_sub_); }; if (!proc_id.empty()) { @@ -532,35 +574,50 @@ return HandleMsg<Reply>(head, query); } +void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node) +{ + auto src = SrcAddr(head); + auto Sub = [&](auto &sub, auto &sub_map) { + auto &topics = msg.topics().topic_list(); + sub[src].insert(topics.begin(), topics.end()); + const TopicDest &dest = {src, SrcAbsAddr(head), node}; + for (auto &topic : topics) { + sub_map[topic].insert(dest); + } + }; + LOG_DEBUG() << "subscribe net : " << msg.network(); + if (msg.network()) { + Sub(net_sub_, center_.net_sub_map_); + } else { + Sub(local_sub_, center_.local_sub_map_); + } +} + MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) { return HandleMsg(head, [&](Node node) { - auto src = SrcAddr(head); - auto &topics = msg.topics().topic_list(); - node->subscriptions_[src].insert(topics.begin(), topics.end()); - TopicDest dest = {src, SrcAbsAddr(head), node}; - for (auto &topic : topics) { - subscribe_map_[topic].insert(dest); - } + node->Subscribe(head, msg, node); return MakeReply(eSuccess); }); } -MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) -{ - return HandleMsg(head, [&](Node node) { - auto src = SrcAddr(head); - auto pos = node->subscriptions_.find(src); - auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { - auto pos = subscribe_map_.find(topic); - if (pos != subscribe_map_.end() && +void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node) +{ + auto src = SrcAddr(head); + + auto Unsub = [&](auto &sub, auto &sub_map) { + auto pos = sub.find(src); + + auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) { + auto pos = sub_map.find(topic); + if (pos != sub_map.end() && pos->second.erase(dest) != 0 && pos->second.empty()) { - subscribe_map_.erase(pos); + sub_map.erase(pos); } }; - if (pos != node->subscriptions_.end()) { + if (pos != sub.end()) { const TopicDest &dest = {src, SrcAbsAddr(head), node}; auto &topics = msg.topics().topic_list(); // clear node sub records; @@ -569,26 +626,44 @@ RemoveSubTopicDestRecord(topic, dest); } if (pos->second.empty()) { - node->subscriptions_.erase(pos); + sub.erase(pos); } } + }; + if (msg.network()) { + Unsub(net_sub_, center_.net_sub_map_); + } else { + Unsub(local_sub_, center_.local_sub_map_); + } +} + +MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) +{ + return HandleMsg(head, [&](Node node) { + node->Unsubscribe(head, msg, node); return MakeReply(eSuccess); }); } -NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic) +NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote) { Clients dests; auto Find1 = [&](const std::string &exact) { - auto pos = subscribe_map_.find(exact); - if (pos != subscribe_map_.end()) { - auto &clients = pos->second; - for (auto &cli : clients) { - if (Valid(cli.weak_node_)) { - dests.insert(cli); + auto FindIn = [&](auto &sub_map) { + auto pos = sub_map.find(exact); + if (pos != sub_map.end()) { + auto &clients = pos->second; + for (auto &cli : clients) { + if (Valid(cli.weak_node_)) { + dests.insert(cli); + } } } + }; + if (!from_remote) { + FindIn(local_sub_map_); } + FindIn(net_sub_map_); }; Find1(topic); @@ -605,15 +680,31 @@ return dests; } -bool NodeCenter::FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) +MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg) { - bool ret = false; - HandleMsg(head, [&](Node node) { - DoFindClients(msg.topic()).swap(out); - ret = true; + return HandleMsg(head, [&](Node node) { + DoPublish(DefaultSender(node->shm_), topic, msg); return MakeReply(eSuccess); - }).Swap(&reply); - return ret; + }); +} + +void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg) +{ + try { + auto clients = DoFindClients(topic, false); + if (clients.empty()) { return; } + + for (auto &cli : clients) { + auto node = cli.weak_node_.lock(); + if (node) { + // should also make sure that mq is not killed before msg expires. + // it would be ok if (kill_time - offline_time) is longer than expire time. + sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); + } + } + } catch (...) { + LOG_ERROR() << "DoPublish error."; + } } void NodeCenter::OnTimer() @@ -659,7 +750,8 @@ } }; EraseMapRec(service_map_, node->services_); - EraseMapRec(subscribe_map_, node->subscriptions_); + EraseMapRec(local_sub_map_, node->local_sub_); + EraseMapRec(net_sub_map_, node->net_sub_); // remove online record. auto pos = online_node_addr_map_.find(node->proc_.proc_id()); @@ -681,10 +773,6 @@ void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content) { try { - // LOG_DEBUG() << "center publish: " << topic << ": " << content; - Clients clients(DoFindClients(topic)); - if (clients.empty()) { return; } - MsgPublish pub; pub.set_topic(topic); pub.set_data(content); @@ -693,16 +781,16 @@ if (msg.Make(head, pub)) { DEFER1(msg.Release()); RecordMsg(msg); - - for (auto &cli : clients) { - auto node = cli.weak_node_.lock(); - if (node && node->state_.flag_ == kStateNormal) { - DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); - } - } + DoPublish(DefaultSender(shm), topic, msg); } } catch (...) { LOG_ERROR() << "center publish error."; } +} + +std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic) +{ + //TODO search synced full list; + return std::vector<std::string>(); } \ No newline at end of file -- Gitblit v1.8.0