From 365c864a587365fe443b11cc0cd7cfc8f8f8eb81 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 01 六月 2021 11:19:22 +0800
Subject: [PATCH] refactor, clean up useless code.
---
box/tcp_connection.cpp | 91 ++++++++++++++++++---------------------------
1 files changed, 36 insertions(+), 55 deletions(-)
diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
index 22162e5..02001bb 100644
--- a/box/tcp_connection.cpp
+++ b/box/tcp_connection.cpp
@@ -19,6 +19,7 @@
#include "log.h"
#include "msg.h"
#include "node_center.h"
+#include "proto.h"
#include "shm_socket.h"
namespace
@@ -59,10 +60,8 @@
bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content)
{
const char *p = buffer.data();
- LOG_DEBUG() << "msg len " << len;
if (4 > len) { return false; }
uint32_t head_len = Get32(p);
- LOG_DEBUG() << "head_len " << head_len;
if (head_len > 1024 * 4) {
throw std::runtime_error("unexpected tcp reply data.");
}
@@ -87,15 +86,34 @@
/// request -----------------------------------------------------------
+void TcpRequest1::SendReply(BHMsgHead &head, std::string body_content)
+{
+ if (reply_cb_) {
+ reply_cb_(head, std::move(body_content));
+ }
+}
+
void TcpRequest1::OnError(bserror_t ec)
{
- LOG_ERROR() << "tcp client error: " << ec;
+ // LOG_ERROR() << "tcp client error: " << ec << ", " << ec.message();
+ BHMsgHead head;
+ std::string body_content;
+ try {
+ std::vector<char> req(request_.begin(), request_.end());
+ if (CheckData(req, req.size(), head, body_content)) {
+ if (head.type() == kMsgTypeRequestTopic) {
+ SendReply(head, MakeReply<MsgRequestTopicReply>(eError, std::to_string(ec.value()) + ',' + ec.message()).SerializeAsString());
+ }
+ }
+ } catch (std::exception &e) {
+ }
Close();
}
void TcpRequest1::Start()
{
auto readReply = [this]() {
+ // if (!reply_cb_) { return; } // no reply needed, maybe safe to close?
recv_buffer_.resize(1000);
recv_len_ = 0;
socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
@@ -104,47 +122,22 @@
socket_.async_connect(remote_, TcpCB(*this, request));
}
-void TcpRequest1::Close()
-{
- LOG_DEBUG() << "client close";
- socket_.close();
-}
+void TcpRequest1::Close() { socket_.close(); }
void TcpRequest1::OnRead(size_t size)
{
- LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_;
-
recv_len_ += size;
BHMsgHead head;
std::string body_content;
- bool recv_done = false;
try {
- recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
+ if (CheckData(recv_buffer_, recv_len_, head, body_content)) { // got reply.
+ Close();
+ SendReply(head, std::move(body_content));
+ } else { // not complete, read again
+ socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+ }
} catch (std::exception &e) {
LOG_ERROR() << e.what();
Close();
- return;
- }
-
- if (recv_done) {
- // just pass to client, no check, client will check it anyway.
- LOG_DEBUG() << "route size: " << head.route_size();
- if (head.route_size() < 1) { return; }
- auto &back = head.route(head.route_size() - 1);
- MQInfo dest = {back.mq_id(), back.abs_addr()};
- head.mutable_route()->RemoveLast();
-
- LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_;
- MsgRequestTopicReply reply;
- if (reply.ParseFromString(body_content)) {
- LOG_DEBUG() << "err msg: " << reply.errmsg().errstring();
- LOG_DEBUG() << "content : " << reply.data();
- }
- Close();
- return;
- shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size()));
- } else { // read again
- LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
- socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
}
}
@@ -153,7 +146,7 @@
void TcpReply1::OnError(bserror_t ec) { Close(); }
void TcpReply1::Close()
{
- LOG_DEBUG() << "server close.";
+ LOG_TRACE() << "server close.";
socket_.close();
}
@@ -177,38 +170,26 @@
return;
}
- auto ParseBody = [&](auto &req) {
- const char *p = recv_buffer_.data();
- uint32_t size = Get32(p);
- p += 4;
- p += size;
- size = Get32(p);
- p += 4;
- return req.ParseFromArray(p, size);
- };
-
if (recv_done) {
- LOG_DEBUG() << "request data: " << size;
- auto self(shared_from_this());
+ LOG_TRACE() << "tcp server recv request data, size: " << size;
MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
if (remote.id_ && remote.offset_) {
+ auto self(shared_from_this());
auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
send_buffer_ = imsg.content();
async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
};
auto &scenter = *pscenter_;
- if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) {
- send_buffer_ = "fake reply";
- async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+ if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
+ return;
}
} else {
LOG_DEBUG() << "no address";
- send_buffer_ = "no address";
- async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
}
+ Close();
- } else { // read again
- LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
+ } else { // not complete, read again
+ LOG_TRACE() << "not complete, read again " << recv_buffer_.size();
socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
}
};
\ No newline at end of file
--
Gitblit v1.8.0