From 365c864a587365fe443b11cc0cd7cfc8f8f8eb81 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 01 六月 2021 11:19:22 +0800
Subject: [PATCH] refactor, clean up useless code.
---
box/tcp_connection.cpp | 211 +++++++++++++++++++++++++++++++++++++++-------------
1 files changed, 156 insertions(+), 55 deletions(-)
diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
index 8968741..02001bb 100644
--- a/box/tcp_connection.cpp
+++ b/box/tcp_connection.cpp
@@ -17,78 +17,179 @@
*/
#include "tcp_connection.h"
#include "log.h"
+#include "msg.h"
+#include "node_center.h"
+#include "proto.h"
+#include "shm_socket.h"
namespace
{
template <class C>
-auto Buffer(C &c) { return boost::asio::buffer(c.data(), c.size()); }
+auto Buffer(C &c, size_t offset = 0) { return boost::asio::buffer(c.data() + offset, c.size() - offset); }
using boost::asio::async_read;
using boost::asio::async_write;
+typedef std::function<void()> VoidHandler;
+typedef std::function<void(size_t)> SizeHandler;
+
+template <class T, class... Param>
+auto TcpCallback(T &conn, std::function<void(Param...)> const &func)
+{
+ auto self(conn.shared_from_this());
+ return [self, func](bserror_t ec, Param... size) {
+ if (!ec) {
+ func(size...);
+ } else {
+ self->OnError(ec);
+ }
+ };
+}
+
+template <class T>
+auto TcpCB(T &conn, VoidHandler const &func) { return TcpCallback(conn, func); }
+
+template <class T>
+auto TcpCBSize(T &conn, SizeHandler const &func) { return TcpCallback(conn, func); }
+
+template <class T>
+auto TcpCBSize(T &conn, VoidHandler const &func)
+{
+ return TcpCBSize(conn, [func](size_t) { func(); });
+}
+
+bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content)
+{
+ const char *p = buffer.data();
+ if (4 > len) { return false; }
+ uint32_t head_len = Get32(p);
+ if (head_len > 1024 * 4) {
+ throw std::runtime_error("unexpected tcp reply data.");
+ }
+ auto before_body = 4 + head_len + 4;
+ if (before_body > len) {
+ if (before_body > buffer.size()) {
+ buffer.resize(before_body);
+ }
+ return false;
+ }
+ if (!head.ParseFromArray(p + 4, head_len)) {
+ throw std::runtime_error("tcp recv invalid reply head.");
+ }
+ uint32_t body_len = Get32(p + 4 + head_len);
+ buffer.resize(before_body + body_len);
+ if (buffer.size() > len) { return false; }
+ body_content.assign(p + before_body, body_len);
+ return true;
+}
+
} // namespace
-TcpRequest1::TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) :
- socket_(io), remote_(addr), request_(std::move(request)) {}
-void TcpRequest1::Connect()
+/// request -----------------------------------------------------------
+
+void TcpRequest1::SendReply(BHMsgHead &head, std::string body_content)
{
- auto self = shared_from_this();
- socket_.async_connect(remote_, [this, self](bserror_t ec) {
- if (!ec) {
- SendRequest();
- } else {
- LOG_ERROR() << "connect error " << ec;
- Close();
- }
- });
+ if (reply_cb_) {
+ reply_cb_(head, std::move(body_content));
+ }
}
-void TcpRequest1::Close()
+
+void TcpRequest1::OnError(bserror_t ec)
{
+ // LOG_ERROR() << "tcp client error: " << ec << ", " << ec.message();
+ BHMsgHead head;
+ std::string body_content;
+ try {
+ std::vector<char> req(request_.begin(), request_.end());
+ if (CheckData(req, req.size(), head, body_content)) {
+ if (head.type() == kMsgTypeRequestTopic) {
+ SendReply(head, MakeReply<MsgRequestTopicReply>(eError, std::to_string(ec.value()) + ',' + ec.message()).SerializeAsString());
+ }
+ }
+ } catch (std::exception &e) {
+ }
+ Close();
+}
+
+void TcpRequest1::Start()
+{
+ auto readReply = [this]() {
+ // if (!reply_cb_) { return; } // no reply needed, maybe safe to close?
+ recv_buffer_.resize(1000);
+ recv_len_ = 0;
+ socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+ };
+ auto request = [this, readReply]() { async_write(socket_, Buffer(request_), TcpCBSize(*this, readReply)); };
+
+ socket_.async_connect(remote_, TcpCB(*this, request));
+}
+void TcpRequest1::Close() { socket_.close(); }
+void TcpRequest1::OnRead(size_t size)
+{
+ recv_len_ += size;
+ BHMsgHead head;
+ std::string body_content;
+ try {
+ if (CheckData(recv_buffer_, recv_len_, head, body_content)) { // got reply.
+ Close();
+ SendReply(head, std::move(body_content));
+ } else { // not complete, read again
+ socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+ }
+ } catch (std::exception &e) {
+ LOG_ERROR() << e.what();
+ Close();
+ }
+}
+
+/// reply --------------------------------------------------------------
+
+void TcpReply1::OnError(bserror_t ec) { Close(); }
+void TcpReply1::Close()
+{
+ LOG_TRACE() << "server close.";
socket_.close();
}
-void TcpRequest1::SendRequest()
-{
- LOG_INFO() << "client sending request " << request_;
- auto self = shared_from_this();
- async_write(socket_, Buffer(request_), [this, self](bserror_t ec, size_t) {
- if (!ec) {
- ReadReply();
- } else {
- Close();
- }
- });
-}
-void TcpRequest1::ReadReply()
-{
- buffer_.resize(1000);
- auto self = shared_from_this();
- socket_.async_read_some(Buffer(buffer_), [this, self](bserror_t ec, size_t size) {
- if (!ec) {
- printf("reply data: %s\n", buffer_.data());
- } else {
- Close();
- }
- });
-}
-
-TcpReply1::TcpReply1(tcp::socket sock) :
- socket_(std::move(sock)) {}
-
void TcpReply1::Start()
{
- LOG_INFO() << "server session reading...";
recv_buffer_.resize(1000);
- auto self(shared_from_this());
- socket_.async_read_some(Buffer(recv_buffer_), [this, self](bserror_t ec, size_t size) {
- LOG_INFO() << "server read : " << recv_buffer_.data();
- // fake reply
- if (!ec) {
- send_buffer_ = std::string(recv_buffer_.data(), size) + " reply";
- async_write(socket_, Buffer(send_buffer_), [this, self](bserror_t ec, size_t size) {
- socket_.close();
- });
+ socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+}
+
+void TcpReply1::OnRead(size_t size)
+{
+ recv_len_ += size;
+ BHMsgHead head;
+ std::string body_content;
+ bool recv_done = false;
+ try {
+ recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
+ } catch (std::exception &e) {
+ LOG_ERROR() << e.what();
+ Close();
+ return;
+ }
+
+ if (recv_done) {
+ LOG_TRACE() << "tcp server recv request data, size: " << size;
+ MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
+ if (remote.id_ && remote.offset_) {
+ auto self(shared_from_this());
+ auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+ send_buffer_ = imsg.content();
+ async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+ };
+ auto &scenter = *pscenter_;
+ if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
+ return;
+ }
} else {
- socket_.close();
+ LOG_DEBUG() << "no address";
}
- });
-}
\ No newline at end of file
+ Close();
+
+ } else { // not complete, read again
+ LOG_TRACE() << "not complete, read again " << recv_buffer_.size();
+ socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+ }
+};
\ No newline at end of file
--
Gitblit v1.8.0