From ae17d1439b35b55212c3a30712e0a60b1d6a99c0 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 30 六月 2021 11:15:53 +0800 Subject: [PATCH] support tcp pub/sub. --- box/node_center.cpp | 82 ++++++++++++++++++++++++++++++++--------- 1 files changed, 64 insertions(+), 18 deletions(-) diff --git a/box/node_center.cpp b/box/node_center.cpp index 068aa00..77bfac1 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -270,6 +270,7 @@ bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content) { + // LOG_FUNCTION; auto &topic = head.topic(); auto clients = DoFindClients(topic, true); if (clients.empty()) { return true; } @@ -288,9 +289,10 @@ } } MsgI msg(shm); - if (msg.Make(body_content)) { + if (msg.Make(head, body_content)) { RecordMsg(msg); msgs.push_back(msg); + // LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_; DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); } }; @@ -554,22 +556,43 @@ typedef MsgQueryTopicReply Reply; auto query = [&](Node self) -> Reply { - auto pos = service_map_.find(req.topic()); - if (pos != service_map_.end() && !pos->second.empty()) { - auto &clients = pos->second; - Reply reply = MakeReply<Reply>(eSuccess); - for (auto &dest : clients) { - Node dest_node(dest.weak_node_.lock()); - if (dest_node && Valid(*dest_node)) { - auto node_addr = reply.add_node_address(); - node_addr->set_proc_id(dest_node->proc_.proc_id()); - node_addr->mutable_addr()->set_mq_id(dest.mq_id_); - node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); + Reply reply = MakeReply<Reply>(eSuccess); + auto local = [&]() { + auto pos = service_map_.find(req.topic()); + if (pos != service_map_.end() && !pos->second.empty()) { + auto &clients = pos->second; + for (auto &dest : clients) { + Node dest_node(dest.weak_node_.lock()); + if (dest_node && Valid(*dest_node)) { + auto node_addr = reply.add_node_address(); + node_addr->set_proc_id(dest_node->proc_.proc_id()); + node_addr->mutable_addr()->set_mq_id(dest.mq_id_); + node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); + } } + return true; + } else { + return false; } - return reply; - } else { + }; + auto net = [&]() { + auto hosts(FindRemoteRPCServers(req.topic())); + if (hosts.empty()) { + return false; + } else { + for (auto &ip : hosts) { + auto node_addr = reply.add_node_address(); + node_addr->mutable_addr()->set_ip(ip); + } + return true; + } + }; + local(); + net(); + if (reply.node_address_size() == 0) { return MakeReply<Reply>(eNotFound, "topic server not found."); + } else { + return reply; } }; @@ -587,7 +610,6 @@ sub_map[topic].insert(dest); } }; - LOG_DEBUG() << "subscribe net : " << msg.network(); if (msg.network()) { Sub(net_sub_, center_.net_sub_map_); center_.Notify(kTopicNodeSub, *this); @@ -651,6 +673,7 @@ NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote) { + // LOG_FUNCTION; Clients dests; auto Find1 = [&](const std::string &exact) { auto FindIn = [&](auto &sub_map) { @@ -666,8 +689,11 @@ }; if (!from_remote) { FindIn(local_sub_map_); + // LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size(); } + // net subscripitions also work in local mode. FindIn(net_sub_map_); + // LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size(); }; Find1(topic); @@ -793,8 +819,28 @@ } } -std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic) +void NodeCenter::NetRecords::ParseData(const ssjson::Json &info) { - //TODO search synced full list; - return std::vector<std::string>(); + // LOG_FUNCTION; + sub_hosts_.clear(); + rpc_hosts_.clear(); + for (auto &host : info.array()) { + if (host.get("isLocal", false)) { + host_id_ = host.get("serverId", ""); + ip_ = host.get("ip", ""); + } else { + auto ip = host.get("ip", ""); + auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) { + for (auto &topic : lot) { + auto t = topic.get_value<std::string>(); + rec[t].insert(ip); + // LOG_DEBUG() << "net topic: " << t << ", " << ip; + } + }; + // LOG_DEBUG() << "serives:"; + UpdateRec(host.child("pubTopics").array(), rpc_hosts_); + // LOG_DEBUG() << "net sub:"; + UpdateRec(host.child("netSubTopics").array(), sub_hosts_); + } + } } \ No newline at end of file -- Gitblit v1.8.0