/* * ===================================================================================== * * Filename: tcp_connection.cpp * * Description: * * Version: 1.0 * Created: 2021年05月25日 15时34分03秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #include "tcp_connection.h" #include "log.h" #include "msg.h" #include "node_center.h" #include "proto.h" #include "shm_socket.h" namespace { template auto Buffer(C &c, size_t offset = 0) { return boost::asio::buffer(c.data() + offset, c.size() - offset); } using boost::asio::async_read; using boost::asio::async_write; typedef std::function VoidHandler; typedef std::function SizeHandler; template auto TcpCallback(T &conn, std::function const &func) { auto self(conn.shared_from_this()); return [self, func](bserror_t ec, Param... size) { if (!ec) { func(size...); } else { self->OnError(ec); } }; } template auto TcpCB(T &conn, VoidHandler const &func) { return TcpCallback(conn, func); } template auto TcpCBSize(T &conn, SizeHandler const &func) { return TcpCallback(conn, func); } template auto TcpCBSize(T &conn, VoidHandler const &func) { return TcpCBSize(conn, [func](size_t) { func(); }); } bool CheckData(std::vector &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content) { const char *p = buffer.data(); if (4 > len) { return false; } uint32_t head_len = Get32(p); if (head_len > 1024 * 4) { throw std::runtime_error("unexpected tcp reply data."); } auto before_body = 4 + head_len + 4; if (before_body > len) { if (before_body > buffer.size()) { buffer.resize(before_body); } return false; } if (!head.ParseFromArray(p + 4, head_len)) { throw std::runtime_error("tcp recv invalid reply head."); } uint32_t body_len = Get32(p + 4 + head_len); buffer.resize(before_body + body_len); if (buffer.size() > len) { return false; } body_content.assign(p + before_body, body_len); return true; } } // namespace /// request ----------------------------------------------------------- void TcpRequest1::SendReply(BHMsgHead &head, std::string body_content) { if (reply_cb_) { reply_cb_(head, std::move(body_content)); } } void TcpRequest1::OnError(bserror_t ec) { // LOG_ERROR() << "tcp client error: " << ec << ", " << ec.message(); BHMsgHead head; std::string body_content; try { std::vector req(request_.begin(), request_.end()); if (CheckData(req, req.size(), head, body_content)) { if (head.type() == kMsgTypeRequestTopic) { SendReply(head, MakeReply(eError, std::to_string(ec.value()) + ',' + ec.message()).SerializeAsString()); } } } catch (std::exception &e) { } Close(); } void TcpRequest1::Start() { auto readReply = [this]() { // if (!reply_cb_) { return; } // no reply needed, maybe safe to close? recv_buffer_.resize(1000); recv_len_ = 0; socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); }; auto request = [this, readReply]() { async_write(socket_, Buffer(request_), TcpCBSize(*this, readReply)); }; socket_.async_connect(remote_, TcpCB(*this, request)); } void TcpRequest1::Close() { socket_.close(); } void TcpRequest1::OnRead(size_t size) { recv_len_ += size; BHMsgHead head; std::string body_content; try { if (CheckData(recv_buffer_, recv_len_, head, body_content)) { // got reply. Close(); SendReply(head, std::move(body_content)); } else { // not complete, read again socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); } } catch (std::exception &e) { LOG_ERROR() << e.what(); Close(); } } /// reply -------------------------------------------------------------- void TcpReply1::OnError(bserror_t ec) { Close(); } void TcpReply1::Close() { LOG_TRACE() << "server close."; socket_.close(); } void TcpReply1::Start() { recv_buffer_.resize(1000); socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); } void TcpReply1::OnRead(size_t size) { recv_len_ += size; BHMsgHead head; std::string body_content; bool recv_done = false; try { recv_done = CheckData(recv_buffer_, recv_len_, head, body_content); } catch (std::exception &e) { LOG_ERROR() << e.what(); Close(); return; } if (recv_done) { LOG_TRACE() << "tcp server recv request data, size: " << size; MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()}; auto self(shared_from_this()); auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { send_buffer_ = imsg.content(); async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); }; auto &scenter = *pscenter_; if (head.type() == kMsgTypePublish) { auto reply = MakeReply(eSuccess); auto rep_head = InitMsgHead(GetType(reply), scenter->id(), 0, head.msg_id()); send_buffer_ = MsgI::Serialize(rep_head, reply); async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); scenter->RemotePublish(head, body_content); return; } else if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { return; } else { Close(); } } else { // not complete, read again LOG_TRACE() << "not complete, read again " << recv_buffer_.size(); socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); } };