| | |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | | MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req) |
| | | { |
| | | typedef MsgQueryProcReply Reply; |
| | | auto query = [&](Node self) -> Reply { |
| | | auto Add1 = [](Reply &reply, Node node) { |
| | | auto info = reply.add_proc_list(); |
| | | *info->mutable_proc() = node->proc_; |
| | | info->set_online(node->state_.flag_ == kStateNormal); |
| | | for (auto &addr_topics : node->services_) { |
| | | for (auto &topic : addr_topics.second) { |
| | | info->mutable_topics()->add_topic_list(topic); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | if (!req.proc_id().empty()) { |
| | | auto pos = online_node_addr_map_.find(req.proc_id()); |
| | | if (pos == online_node_addr_map_.end()) { |
| | | return MakeReply<Reply>(eNotFound, "proc not found."); |
| | | } else { |
| | | auto node_pos = nodes_.find(pos->second); |
| | | if (node_pos == nodes_.end()) { |
| | | return MakeReply<Reply>(eNotFound, "proc node not found."); |
| | | } else { |
| | | auto reply = MakeReply<Reply>(eSuccess); |
| | | Add1(reply, node_pos->second); |
| | | return reply; |
| | | } |
| | | } |
| | | } else { |
| | | Reply reply(MakeReply<Reply>(eSuccess)); |
| | | for (auto &kv : nodes_) { |
| | | Add1(reply, kv.second); |
| | | } |
| | | return reply; |
| | | } |
| | | }; |
| | | |
| | | return HandleMsg<Reply>(head, query); |
| | | } |
| | | |
| | | MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req) |
| | | { |
| | | typedef MsgQueryTopicReply Reply; |
| | | |
| | | auto query = [&](Node self) -> MsgQueryTopicReply { |
| | | auto query = [&](Node self) -> Reply { |
| | | auto pos = service_map_.find(req.topic()); |
| | | if (pos != service_map_.end() && !pos->second.empty()) { |
| | | auto &clients = pos->second; |
| | |
| | | |
| | | CASE_ON_MSG_TYPE(RegisterRPC); |
| | | CASE_ON_MSG_TYPE(QueryTopic); |
| | | CASE_ON_MSG_TYPE(QueryProc); |
| | | default: return false; |
| | | } |
| | | }; |