From 903b27f875e5f2a872c1b309f354b18c0450f35a Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 04 六月 2021 11:46:18 +0800 Subject: [PATCH] allow tcp request with no dest, auto query topic. --- box/tcp_connection.cpp | 22 ++++------ utest/tcp_test.cpp | 7 ++- box/node_center.h | 2 box/node_center.cpp | 33 ++++++++++++++-- 4 files changed, 43 insertions(+), 21 deletions(-) diff --git a/box/node_center.cpp b/box/node_center.cpp index b285c9f..5c24409 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -222,12 +222,37 @@ return node; } -bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) +bool NodeCenter::PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) { - Node node(GetNode(dest.id_)); - if (!node || !Valid(*node)) { - LOG_ERROR() << id() << " pass remote request, dest not found."; + Node node; + + auto FindDest = [&]() { + auto pos = service_map_.find(head.topic()); + if (pos != service_map_.end() && !pos->second.empty()) { + auto &clients = pos->second; + for (auto &cli : clients) { + node = cli.weak_node_.lock(); + if (node && Valid(*node)) { + dest.id_ = cli.mq_id_; + dest.offset_ = cli.mq_abs_addr_; + return true; + } + } + } return false; + }; + + if (dest.id_ == 0) { + if (!FindDest()) { + LOG_ERROR() << id() << " pass remote request, topic dest not found."; + return false; + } + } else { + node = GetNode(dest.id_); + if (!node || !Valid(*node)) { + LOG_ERROR() << id() << " pass remote request, dest not found."; + return false; + } } ShmSocket &sender(DefaultSender(node->shm_)); diff --git a/box/node_center.h b/box/node_center.h index 461a354..74dd52f 100644 --- a/box/node_center.h +++ b/box/node_center.h @@ -121,7 +121,7 @@ void RecordMsg(const MsgI &msg); bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); - bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); + bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content); void OnAlloc(ShmSocket &socket, const int64_t val); void OnFree(ShmSocket &socket, const int64_t val); diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp index 02001bb..85ed4ed 100644 --- a/box/tcp_connection.cpp +++ b/box/tcp_connection.cpp @@ -173,21 +173,17 @@ if (recv_done) { LOG_TRACE() << "tcp server recv request data, size: " << size; MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()}; - if (remote.id_ && remote.offset_) { - auto self(shared_from_this()); - auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { - send_buffer_ = imsg.content(); - async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); - }; - auto &scenter = *pscenter_; - if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { - return; - } + auto self(shared_from_this()); + auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + send_buffer_ = imsg.content(); + async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); + }; + auto &scenter = *pscenter_; + if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { + return; } else { - LOG_DEBUG() << "no address"; + Close(); } - Close(); - } else { // not complete, read again LOG_TRACE() << "not complete, read again " << recv_buffer_.size(); socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp index 2aead7e..0ead665 100644 --- a/utest/tcp_test.cpp +++ b/utest/tcp_test.cpp @@ -49,11 +49,12 @@ auto route = head.add_route(); route->set_mq_id(12345); route->set_abs_addr(67890); + head.set_topic(req.topic()); head.mutable_dest()->set_ip(connect_addr); - head.mutable_dest()->set_port(port); - head.mutable_dest()->set_mq_id(201); - head.mutable_dest()->set_abs_addr(10072); + // head.mutable_dest()->set_port(port); + // head.mutable_dest()->set_mq_id(201); + // head.mutable_dest()->set_abs_addr(10072); return (MsgI::Serialize(head, req)); }; -- Gitblit v1.8.0