/*
|
* =====================================================================================
|
*
|
* Filename: tcp_connection.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年05月25日 15时34分03秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "tcp_connection.h"
|
#include "log.h"
|
#include "msg.h"
|
#include "node_center.h"
|
#include "proto.h"
|
#include "shm_socket.h"
|
|
namespace
|
{
|
template <class C>
|
auto Buffer(C &c, size_t offset = 0) { return boost::asio::buffer(c.data() + offset, c.size() - offset); }
|
using boost::asio::async_read;
|
using boost::asio::async_write;
|
|
typedef std::function<void()> VoidHandler;
|
typedef std::function<void(size_t)> SizeHandler;
|
|
template <class T, class... Param>
|
auto TcpCallback(T &conn, std::function<void(Param...)> const &func)
|
{
|
auto self(conn.shared_from_this());
|
return [self, func](bserror_t ec, Param... size) {
|
if (!ec) {
|
func(size...);
|
} else {
|
self->OnError(ec);
|
}
|
};
|
}
|
|
template <class T>
|
auto TcpCB(T &conn, VoidHandler const &func) { return TcpCallback(conn, func); }
|
|
template <class T>
|
auto TcpCBSize(T &conn, SizeHandler const &func) { return TcpCallback(conn, func); }
|
|
template <class T>
|
auto TcpCBSize(T &conn, VoidHandler const &func)
|
{
|
return TcpCBSize(conn, [func](size_t) { func(); });
|
}
|
|
bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content)
|
{
|
const char *p = buffer.data();
|
if (4 > len) { return false; }
|
uint32_t head_len = Get32(p);
|
if (head_len > 1024 * 4) {
|
throw std::runtime_error("unexpected tcp data head.");
|
}
|
auto before_body = 4 + head_len + 4;
|
if (before_body > len) {
|
if (before_body > buffer.size()) {
|
buffer.resize(before_body);
|
}
|
return false;
|
}
|
if (!head.ParseFromArray(p + 4, head_len)) {
|
throw std::runtime_error("tcp recv invalid reply head.");
|
}
|
uint32_t body_len = Get32(p + 4 + head_len);
|
buffer.resize(before_body + body_len);
|
if (buffer.size() > len) { return false; }
|
body_content.assign(p + before_body, body_len);
|
return true;
|
}
|
|
} // namespace
|
|
/// request -----------------------------------------------------------
|
|
void TcpRequest1::SendReply(BHMsgHead &head, std::string body_content)
|
{
|
if (reply_cb_) {
|
reply_cb_(head, std::move(body_content));
|
}
|
}
|
|
void TcpRequest1::OnError(bserror_t ec)
|
{
|
// LOG_ERROR() << "tcp client error: " << ec << ", " << ec.message();
|
BHMsgHead head;
|
std::string body_content;
|
try {
|
std::vector<char> req(request_.begin(), request_.end());
|
if (CheckData(req, req.size(), head, body_content)) {
|
if (head.type() == kMsgTypeRequestTopic) {
|
SendReply(head, MakeReply<MsgRequestTopicReply>(eError, std::to_string(ec.value()) + ',' + ec.message()).SerializeAsString());
|
}
|
}
|
} catch (std::exception &e) {
|
}
|
Close();
|
}
|
|
void TcpRequest1::Start()
|
{
|
auto readReply = [this]() {
|
// if (!reply_cb_) { return; } // no reply needed, maybe safe to close?
|
recv_buffer_.resize(1000);
|
recv_len_ = 0;
|
socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
|
};
|
auto request = [this, readReply]() { async_write(socket_, Buffer(request_), TcpCBSize(*this, readReply)); };
|
|
socket_.async_connect(remote_, TcpCB(*this, request));
|
}
|
void TcpRequest1::Close() { socket_.close(); }
|
void TcpRequest1::OnRead(size_t size)
|
{
|
recv_len_ += size;
|
BHMsgHead head;
|
std::string body_content;
|
try {
|
if (CheckData(recv_buffer_, recv_len_, head, body_content)) { // got reply.
|
Close();
|
SendReply(head, std::move(body_content));
|
} else { // not complete, read again
|
socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
|
}
|
} catch (std::exception &e) {
|
LOG_ERROR() << e.what();
|
Close();
|
}
|
}
|
|
/// reply --------------------------------------------------------------
|
|
void TcpReply1::OnError(bserror_t ec) { Close(); }
|
void TcpReply1::Close()
|
{
|
LOG_TRACE() << "server close.";
|
socket_.close();
|
}
|
|
void TcpReply1::Start()
|
{
|
recv_buffer_.resize(1000);
|
socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
|
}
|
|
void TcpReply1::OnRead(size_t size)
|
{
|
recv_len_ += size;
|
BHMsgHead head;
|
std::string body_content;
|
bool recv_done = false;
|
try {
|
recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
|
} catch (std::exception &e) {
|
LOG_ERROR() << e.what();
|
Close();
|
return;
|
}
|
|
if (recv_done) {
|
LOG_TRACE() << "tcp server recv request data, size: " << size;
|
MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
|
auto self(shared_from_this());
|
auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
|
send_buffer_ = imsg.content();
|
async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
|
};
|
|
auto &scenter = *pscenter_;
|
if (head.type() == kMsgTypePublish) {
|
auto reply = MakeReply(eSuccess);
|
auto rep_head = InitMsgHead(GetType(reply), scenter->id(), 0, head.msg_id());
|
send_buffer_ = MsgI::Serialize(rep_head, reply);
|
async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
|
|
scenter->RemotePublish(head, body_content);
|
return;
|
} else if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
|
return;
|
} else {
|
Close();
|
}
|
} else { // not complete, read again
|
LOG_TRACE() << "not complete, read again " << recv_buffer_.size();
|
socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
|
}
|
};
|