allow tcp request with no dest, auto query topic.
| | |
| | | return node; |
| | | } |
| | | |
| | | bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) |
| | | bool NodeCenter::PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) |
| | | { |
| | | Node node(GetNode(dest.id_)); |
| | | if (!node || !Valid(*node)) { |
| | | LOG_ERROR() << id() << " pass remote request, dest not found."; |
| | | Node node; |
| | | |
| | | auto FindDest = [&]() { |
| | | auto pos = service_map_.find(head.topic()); |
| | | if (pos != service_map_.end() && !pos->second.empty()) { |
| | | auto &clients = pos->second; |
| | | for (auto &cli : clients) { |
| | | node = cli.weak_node_.lock(); |
| | | if (node && Valid(*node)) { |
| | | dest.id_ = cli.mq_id_; |
| | | dest.offset_ = cli.mq_abs_addr_; |
| | | return true; |
| | | } |
| | | } |
| | | } |
| | | return false; |
| | | }; |
| | | |
| | | if (dest.id_ == 0) { |
| | | if (!FindDest()) { |
| | | LOG_ERROR() << id() << " pass remote request, topic dest not found."; |
| | | return false; |
| | | } |
| | | } else { |
| | | node = GetNode(dest.id_); |
| | | if (!node || !Valid(*node)) { |
| | | LOG_ERROR() << id() << " pass remote request, dest not found."; |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | ShmSocket &sender(DefaultSender(node->shm_)); |
| | |
| | | void RecordMsg(const MsgI &msg); |
| | | bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); |
| | | bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); |
| | | bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); |
| | | bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); |
| | | bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content); |
| | | void OnAlloc(ShmSocket &socket, const int64_t val); |
| | | void OnFree(ShmSocket &socket, const int64_t val); |
| | |
| | | if (recv_done) { |
| | | LOG_TRACE() << "tcp server recv request data, size: " << size; |
| | | MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()}; |
| | | if (remote.id_ && remote.offset_) { |
| | | auto self(shared_from_this()); |
| | | auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | send_buffer_ = imsg.content(); |
| | | async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); |
| | | }; |
| | | auto &scenter = *pscenter_; |
| | | if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { |
| | | return; |
| | | } |
| | | auto self(shared_from_this()); |
| | | auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | send_buffer_ = imsg.content(); |
| | | async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); |
| | | }; |
| | | auto &scenter = *pscenter_; |
| | | if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { |
| | | return; |
| | | } else { |
| | | LOG_DEBUG() << "no address"; |
| | | Close(); |
| | | } |
| | | Close(); |
| | | |
| | | } else { // not complete, read again |
| | | LOG_TRACE() << "not complete, read again " << recv_buffer_.size(); |
| | | socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); |
| | |
| | | auto route = head.add_route(); |
| | | route->set_mq_id(12345); |
| | | route->set_abs_addr(67890); |
| | | head.set_topic(req.topic()); |
| | | |
| | | head.mutable_dest()->set_ip(connect_addr); |
| | | head.mutable_dest()->set_port(port); |
| | | head.mutable_dest()->set_mq_id(201); |
| | | head.mutable_dest()->set_abs_addr(10072); |
| | | // head.mutable_dest()->set_port(port); |
| | | // head.mutable_dest()->set_mq_id(201); |
| | | // head.mutable_dest()->set_abs_addr(10072); |
| | | |
| | | return (MsgI::Serialize(head, req)); |
| | | }; |