From 9243710ca372de26823c2225c7b46b072458c671 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 28 五月 2021 17:18:33 +0800 Subject: [PATCH] tcp proxy requests, need more test. --- src/topic_node.cpp | 54 ++++++++++++++++++++++++++++++++++-------------------- 1 files changed, 34 insertions(+), 20 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 3c38121..b21f7ef 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -497,9 +497,10 @@ out_msg_id = msg_id; - auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) { + auto SendTo = [this, remote_addr, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) { auto &sock = SockClient(); BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id)); + *head.mutable_dest() = remote_addr; AddRoute(head, sock); head.set_topic(req.topic()); @@ -519,8 +520,12 @@ }; try { - BHAddress addr; - return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb); + if (remote_addr.ip().empty()) { + BHAddress addr; + return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb); + } else { + return SendTo(CenterAddr(), req, cb); + } } catch (...) { SetLastError(eError, "internal error."); return false; @@ -536,25 +541,34 @@ try { auto &sock = SockClient(); - - BHAddress addr; - if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { - LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id(); - BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); - AddRoute(head, sock); - head.set_topic(request.topic()); - - MsgI reply_msg(shm()); - DEFER1(reply_msg.Release();); - BHMsgHead reply_head; - - if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) && - reply_head.type() == kMsgTypeRequestTopicReply && - reply_msg.ParseBody(out_reply)) { - reply_head.mutable_proc_id()->swap(out_proc_id); - return true; + MQInfo dest; + if (!remote_addr.ip().empty()) { + dest = CenterAddr(); + } else { + BHAddress addr; + if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { + dest.offset_ = addr.abs_addr(); + dest.id_ = addr.mq_id(); + } else { + return false; } } + + BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); + *head.mutable_dest() = remote_addr; + AddRoute(head, sock); + head.set_topic(request.topic()); + + MsgI reply_msg(shm()); + DEFER1(reply_msg.Release();); + BHMsgHead reply_head; + + if (sock.SendAndRecv(dest, head, request, reply_msg, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeRequestTopicReply && + reply_msg.ParseBody(out_reply)) { + reply_head.mutable_proc_id()->swap(out_proc_id); + return true; + } } catch (...) { SetLastError(eError, __func__ + std::string(" internal errer.")); } -- Gitblit v1.8.0