| | |
| | | if (4 > len) { return false; } |
| | | uint32_t head_len = Get32(p); |
| | | if (head_len > 1024 * 4) { |
| | | throw std::runtime_error("unexpected tcp reply data."); |
| | | throw std::runtime_error("unexpected tcp data head."); |
| | | } |
| | | auto before_body = 4 + head_len + 4; |
| | | if (before_body > len) { |
| | |
| | | if (recv_done) { |
| | | LOG_TRACE() << "tcp server recv request data, size: " << size; |
| | | MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()}; |
| | | if (remote.id_ && remote.offset_) { |
| | | auto self(shared_from_this()); |
| | | auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | send_buffer_ = imsg.content(); |
| | | async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); |
| | | }; |
| | | auto &scenter = *pscenter_; |
| | | if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { |
| | | return; |
| | | } |
| | | } else { |
| | | LOG_DEBUG() << "no address"; |
| | | } |
| | | Close(); |
| | | auto self(shared_from_this()); |
| | | auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | send_buffer_ = imsg.content(); |
| | | async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); |
| | | }; |
| | | |
| | | auto &scenter = *pscenter_; |
| | | if (head.type() == kMsgTypePublish) { |
| | | auto reply = MakeReply(eSuccess); |
| | | auto rep_head = InitMsgHead(GetType(reply), scenter->id(), 0, head.msg_id()); |
| | | send_buffer_ = MsgI::Serialize(rep_head, reply); |
| | | async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); |
| | | |
| | | scenter->RemotePublish(head, body_content); |
| | | return; |
| | | } else if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { |
| | | return; |
| | | } else { |
| | | Close(); |
| | | } |
| | | } else { // not complete, read again |
| | | LOG_TRACE() << "not complete, read again " << recv_buffer_.size(); |
| | | socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); |