liuxiaolong
2021-07-20 58d904a328c0d849769b483e901a0be9426b8209
box/tcp_connection.cpp
@@ -63,7 +63,7 @@
   if (4 > len) { return false; }
   uint32_t head_len = Get32(p);
   if (head_len > 1024 * 4) {
      throw std::runtime_error("unexpected tcp reply data.");
      throw std::runtime_error("unexpected tcp data head.");
   }
   auto before_body = 4 + head_len + 4;
   if (before_body > len) {
@@ -173,21 +173,26 @@
   if (recv_done) {
      LOG_TRACE() << "tcp server recv request data, size: " << size;
      MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
      if (remote.id_ && remote.offset_) {
         auto self(shared_from_this());
         auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
            send_buffer_ = imsg.content();
            async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
         };
         auto &scenter = *pscenter_;
         if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
            return;
         }
      } else {
         LOG_DEBUG() << "no address";
      }
      Close();
      auto self(shared_from_this());
      auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
         send_buffer_ = imsg.content();
         async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
      };
      auto &scenter = *pscenter_;
      if (head.type() == kMsgTypePublish) {
         auto reply = MakeReply(eSuccess);
         auto rep_head = InitMsgHead(GetType(reply), scenter->id(), 0, head.msg_id());
         send_buffer_ = MsgI::Serialize(rep_head, reply);
         async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
         scenter->RemotePublish(head, body_content);
         return;
      } else if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
         return;
      } else {
         Close();
      }
   } else { // not complete, read again
      LOG_TRACE() << "not complete, read again " << recv_buffer_.size();
      socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));