lichao
2021-06-04 903b27f875e5f2a872c1b309f354b18c0450f35a
allow tcp request with no dest, auto query topic.
4个文件已修改
64 ■■■■■ 已修改文件
box/node_center.cpp 33 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_connection.cpp 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/tcp_test.cpp 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.cpp
@@ -222,12 +222,37 @@
    return node;
}
bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
bool NodeCenter::PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
{
    Node node(GetNode(dest.id_));
    if (!node || !Valid(*node)) {
        LOG_ERROR() << id() << " pass remote request, dest not found.";
    Node node;
    auto FindDest = [&]() {
        auto pos = service_map_.find(head.topic());
        if (pos != service_map_.end() && !pos->second.empty()) {
            auto &clients = pos->second;
            for (auto &cli : clients) {
                node = cli.weak_node_.lock();
                if (node && Valid(*node)) {
                    dest.id_ = cli.mq_id_;
                    dest.offset_ = cli.mq_abs_addr_;
                    return true;
                }
            }
        }
        return false;
    };
    if (dest.id_ == 0) {
        if (!FindDest()) {
            LOG_ERROR() << id() << " pass remote request, topic dest not found.";
            return false;
        }
    } else {
        node = GetNode(dest.id_);
        if (!node || !Valid(*node)) {
            LOG_ERROR() << id() << " pass remote request, dest not found.";
            return false;
        }
    }
    ShmSocket &sender(DefaultSender(node->shm_));
box/node_center.h
@@ -121,7 +121,7 @@
    void RecordMsg(const MsgI &msg);
    bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
    bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
    bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
    bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
    bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
    void OnAlloc(ShmSocket &socket, const int64_t val);
    void OnFree(ShmSocket &socket, const int64_t val);
box/tcp_connection.cpp
@@ -173,21 +173,17 @@
    if (recv_done) {
        LOG_TRACE() << "tcp server recv request data, size: " << size;
        MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
        if (remote.id_ && remote.offset_) {
            auto self(shared_from_this());
            auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
                send_buffer_ = imsg.content();
                async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
            };
            auto &scenter = *pscenter_;
            if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
                return;
            }
        auto self(shared_from_this());
        auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
            send_buffer_ = imsg.content();
            async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
        };
        auto &scenter = *pscenter_;
        if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
            return;
        } else {
            LOG_DEBUG() << "no address";
            Close();
        }
        Close();
    } else { // not complete, read again
        LOG_TRACE() << "not complete, read again " << recv_buffer_.size();
        socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
utest/tcp_test.cpp
@@ -49,11 +49,12 @@
        auto route = head.add_route();
        route->set_mq_id(12345);
        route->set_abs_addr(67890);
        head.set_topic(req.topic());
        head.mutable_dest()->set_ip(connect_addr);
        head.mutable_dest()->set_port(port);
        head.mutable_dest()->set_mq_id(201);
        head.mutable_dest()->set_abs_addr(10072);
        // head.mutable_dest()->set_port(port);
        // head.mutable_dest()->set_mq_id(201);
        // head.mutable_dest()->set_abs_addr(10072);
        return (MsgI::Serialize(head, req));
    };