From 58d904a328c0d849769b483e901a0be9426b8209 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期二, 20 七月 2021 20:20:44 +0800 Subject: [PATCH] 调整Request C.BHFree的位置 --- box/tcp_connection.cpp | 35 ++++++++++++++++++++--------------- 1 files changed, 20 insertions(+), 15 deletions(-) diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp index 02001bb..8f0fe86 100644 --- a/box/tcp_connection.cpp +++ b/box/tcp_connection.cpp @@ -63,7 +63,7 @@ if (4 > len) { return false; } uint32_t head_len = Get32(p); if (head_len > 1024 * 4) { - throw std::runtime_error("unexpected tcp reply data."); + throw std::runtime_error("unexpected tcp data head."); } auto before_body = 4 + head_len + 4; if (before_body > len) { @@ -173,21 +173,26 @@ if (recv_done) { LOG_TRACE() << "tcp server recv request data, size: " << size; MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()}; - if (remote.id_ && remote.offset_) { - auto self(shared_from_this()); - auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { - send_buffer_ = imsg.content(); - async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); - }; - auto &scenter = *pscenter_; - if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { - return; - } - } else { - LOG_DEBUG() << "no address"; - } - Close(); + auto self(shared_from_this()); + auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + send_buffer_ = imsg.content(); + async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); + }; + auto &scenter = *pscenter_; + if (head.type() == kMsgTypePublish) { + auto reply = MakeReply(eSuccess); + auto rep_head = InitMsgHead(GetType(reply), scenter->id(), 0, head.msg_id()); + send_buffer_ = MsgI::Serialize(rep_head, reply); + async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); + + scenter->RemotePublish(head, body_content); + return; + } else if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { + return; + } else { + Close(); + } } else { // not complete, read again LOG_TRACE() << "not complete, read again " << recv_buffer_.size(); socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); -- Gitblit v1.8.0