| | |
| | | return node; |
| | | } |
| | | |
| | | bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) |
| | | bool NodeCenter::PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) |
| | | { |
| | | Node node(GetNode(dest.id_)); |
| | | if (!node || !Valid(*node)) { |
| | | LOG_ERROR() << id() << " pass remote request, dest not found."; |
| | | Node node; |
| | | |
| | | auto FindDest = [&]() { |
| | | auto pos = service_map_.find(head.topic()); |
| | | if (pos != service_map_.end() && !pos->second.empty()) { |
| | | auto &clients = pos->second; |
| | | for (auto &cli : clients) { |
| | | node = cli.weak_node_.lock(); |
| | | if (node && Valid(*node)) { |
| | | dest.id_ = cli.mq_id_; |
| | | dest.offset_ = cli.mq_abs_addr_; |
| | | return true; |
| | | } |
| | | } |
| | | } |
| | | return false; |
| | | }; |
| | | |
| | | if (dest.id_ == 0) { |
| | | if (!FindDest()) { |
| | | LOG_ERROR() << id() << " pass remote request, topic dest not found."; |
| | | return false; |
| | | } |
| | | } else { |
| | | node = GetNode(dest.id_); |
| | | if (!node || !Valid(*node)) { |
| | | LOG_ERROR() << id() << " pass remote request, dest not found."; |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | ShmSocket &sender(DefaultSender(node->shm_)); |