| | |
| | | |
| | | bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content) |
| | | { |
| | | // LOG_FUNCTION; |
| | | auto &topic = head.topic(); |
| | | auto clients = DoFindClients(topic, true); |
| | | if (clients.empty()) { return true; } |
| | |
| | | } |
| | | } |
| | | MsgI msg(shm); |
| | | if (msg.Make(body_content)) { |
| | | if (msg.Make(head, body_content)) { |
| | | RecordMsg(msg); |
| | | msgs.push_back(msg); |
| | | // LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_; |
| | | DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | } |
| | | }; |
| | |
| | | typedef MsgQueryTopicReply Reply; |
| | | |
| | | auto query = [&](Node self) -> Reply { |
| | | auto pos = service_map_.find(req.topic()); |
| | | if (pos != service_map_.end() && !pos->second.empty()) { |
| | | auto &clients = pos->second; |
| | | Reply reply = MakeReply<Reply>(eSuccess); |
| | | for (auto &dest : clients) { |
| | | Node dest_node(dest.weak_node_.lock()); |
| | | if (dest_node && Valid(*dest_node)) { |
| | | auto node_addr = reply.add_node_address(); |
| | | node_addr->set_proc_id(dest_node->proc_.proc_id()); |
| | | node_addr->mutable_addr()->set_mq_id(dest.mq_id_); |
| | | node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); |
| | | Reply reply = MakeReply<Reply>(eSuccess); |
| | | auto local = [&]() { |
| | | auto pos = service_map_.find(req.topic()); |
| | | if (pos != service_map_.end() && !pos->second.empty()) { |
| | | auto &clients = pos->second; |
| | | for (auto &dest : clients) { |
| | | Node dest_node(dest.weak_node_.lock()); |
| | | if (dest_node && Valid(*dest_node)) { |
| | | auto node_addr = reply.add_node_address(); |
| | | node_addr->set_proc_id(dest_node->proc_.proc_id()); |
| | | node_addr->mutable_addr()->set_mq_id(dest.mq_id_); |
| | | node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); |
| | | } |
| | | } |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | return reply; |
| | | } else { |
| | | }; |
| | | auto net = [&]() { |
| | | auto hosts(FindRemoteRPCServers(req.topic())); |
| | | if (hosts.empty()) { |
| | | return false; |
| | | } else { |
| | | for (auto &ip : hosts) { |
| | | auto node_addr = reply.add_node_address(); |
| | | node_addr->mutable_addr()->set_ip(ip); |
| | | } |
| | | return true; |
| | | } |
| | | }; |
| | | local(); |
| | | net(); |
| | | if (reply.node_address_size() == 0) { |
| | | return MakeReply<Reply>(eNotFound, "topic server not found."); |
| | | } else { |
| | | return reply; |
| | | } |
| | | }; |
| | | |
| | |
| | | sub_map[topic].insert(dest); |
| | | } |
| | | }; |
| | | LOG_DEBUG() << "subscribe net : " << msg.network(); |
| | | if (msg.network()) { |
| | | Sub(net_sub_, center_.net_sub_map_); |
| | | center_.Notify(kTopicNodeSub, *this); |
| | |
| | | |
| | | NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote) |
| | | { |
| | | // LOG_FUNCTION; |
| | | Clients dests; |
| | | auto Find1 = [&](const std::string &exact) { |
| | | auto FindIn = [&](auto &sub_map) { |
| | |
| | | }; |
| | | if (!from_remote) { |
| | | FindIn(local_sub_map_); |
| | | // LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size(); |
| | | } |
| | | // net subscripitions also work in local mode. |
| | | FindIn(net_sub_map_); |
| | | // LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size(); |
| | | }; |
| | | Find1(topic); |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic) |
| | | void NodeCenter::NetRecords::ParseData(const ssjson::Json &info) |
| | | { |
| | | //TODO search synced full list; |
| | | return std::vector<std::string>(); |
| | | // LOG_FUNCTION; |
| | | sub_hosts_.clear(); |
| | | rpc_hosts_.clear(); |
| | | for (auto &host : info.array()) { |
| | | if (host.get("isLocal", false)) { |
| | | host_id_ = host.get("serverId", ""); |
| | | ip_ = host.get("ip", ""); |
| | | } else { |
| | | auto ip = host.get("ip", ""); |
| | | auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) { |
| | | for (auto &topic : lot) { |
| | | auto t = topic.get_value<std::string>(); |
| | | rec[t].insert(ip); |
| | | // LOG_DEBUG() << "net topic: " << t << ", " << ip; |
| | | } |
| | | }; |
| | | // LOG_DEBUG() << "serives:"; |
| | | UpdateRec(host.child("pubTopics").array(), rpc_hosts_); |
| | | // LOG_DEBUG() << "net sub:"; |
| | | UpdateRec(host.child("netSubTopics").array(), sub_hosts_); |
| | | } |
| | | } |
| | | } |