From 9243710ca372de26823c2225c7b46b072458c671 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 28 五月 2021 17:18:33 +0800 Subject: [PATCH] tcp proxy requests, need more test. --- box/tcp_connection.cpp | 91 ++++++++++++++++++--------------------------- 1 files changed, 36 insertions(+), 55 deletions(-) diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp index 22162e5..02001bb 100644 --- a/box/tcp_connection.cpp +++ b/box/tcp_connection.cpp @@ -19,6 +19,7 @@ #include "log.h" #include "msg.h" #include "node_center.h" +#include "proto.h" #include "shm_socket.h" namespace @@ -59,10 +60,8 @@ bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content) { const char *p = buffer.data(); - LOG_DEBUG() << "msg len " << len; if (4 > len) { return false; } uint32_t head_len = Get32(p); - LOG_DEBUG() << "head_len " << head_len; if (head_len > 1024 * 4) { throw std::runtime_error("unexpected tcp reply data."); } @@ -87,15 +86,34 @@ /// request ----------------------------------------------------------- +void TcpRequest1::SendReply(BHMsgHead &head, std::string body_content) +{ + if (reply_cb_) { + reply_cb_(head, std::move(body_content)); + } +} + void TcpRequest1::OnError(bserror_t ec) { - LOG_ERROR() << "tcp client error: " << ec; + // LOG_ERROR() << "tcp client error: " << ec << ", " << ec.message(); + BHMsgHead head; + std::string body_content; + try { + std::vector<char> req(request_.begin(), request_.end()); + if (CheckData(req, req.size(), head, body_content)) { + if (head.type() == kMsgTypeRequestTopic) { + SendReply(head, MakeReply<MsgRequestTopicReply>(eError, std::to_string(ec.value()) + ',' + ec.message()).SerializeAsString()); + } + } + } catch (std::exception &e) { + } Close(); } void TcpRequest1::Start() { auto readReply = [this]() { + // if (!reply_cb_) { return; } // no reply needed, maybe safe to close? recv_buffer_.resize(1000); recv_len_ = 0; socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); @@ -104,47 +122,22 @@ socket_.async_connect(remote_, TcpCB(*this, request)); } -void TcpRequest1::Close() -{ - LOG_DEBUG() << "client close"; - socket_.close(); -} +void TcpRequest1::Close() { socket_.close(); } void TcpRequest1::OnRead(size_t size) { - LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_; - recv_len_ += size; BHMsgHead head; std::string body_content; - bool recv_done = false; try { - recv_done = CheckData(recv_buffer_, recv_len_, head, body_content); + if (CheckData(recv_buffer_, recv_len_, head, body_content)) { // got reply. + Close(); + SendReply(head, std::move(body_content)); + } else { // not complete, read again + socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); + } } catch (std::exception &e) { LOG_ERROR() << e.what(); Close(); - return; - } - - if (recv_done) { - // just pass to client, no check, client will check it anyway. - LOG_DEBUG() << "route size: " << head.route_size(); - if (head.route_size() < 1) { return; } - auto &back = head.route(head.route_size() - 1); - MQInfo dest = {back.mq_id(), back.abs_addr()}; - head.mutable_route()->RemoveLast(); - - LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_; - MsgRequestTopicReply reply; - if (reply.ParseFromString(body_content)) { - LOG_DEBUG() << "err msg: " << reply.errmsg().errstring(); - LOG_DEBUG() << "content : " << reply.data(); - } - Close(); - return; - shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size())); - } else { // read again - LOG_DEBUG() << "not complete, read again " << recv_buffer_.size(); - socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); } } @@ -153,7 +146,7 @@ void TcpReply1::OnError(bserror_t ec) { Close(); } void TcpReply1::Close() { - LOG_DEBUG() << "server close."; + LOG_TRACE() << "server close."; socket_.close(); } @@ -177,38 +170,26 @@ return; } - auto ParseBody = [&](auto &req) { - const char *p = recv_buffer_.data(); - uint32_t size = Get32(p); - p += 4; - p += size; - size = Get32(p); - p += 4; - return req.ParseFromArray(p, size); - }; - if (recv_done) { - LOG_DEBUG() << "request data: " << size; - auto self(shared_from_this()); + LOG_TRACE() << "tcp server recv request data, size: " << size; MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()}; if (remote.id_ && remote.offset_) { + auto self(shared_from_this()); auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { send_buffer_ = imsg.content(); async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); }; auto &scenter = *pscenter_; - if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) { - send_buffer_ = "fake reply"; - async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); + if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { + return; } } else { LOG_DEBUG() << "no address"; - send_buffer_ = "no address"; - async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); } + Close(); - } else { // read again - LOG_DEBUG() << "not complete, read again " << recv_buffer_.size(); + } else { // not complete, read again + LOG_TRACE() << "not complete, read again " << recv_buffer_.size(); socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); } }; \ No newline at end of file -- Gitblit v1.8.0