lichao
2021-05-28 9243710ca372de26823c2225c7b46b072458c671
tcp proxy requests, need more test.
19个文件已修改
536 ■■■■ 已修改文件
box/center.cpp 50 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_topic_node.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/io_service.cpp 33 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/io_service.h 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.cpp 57 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.h 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_common.h 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_connection.cpp 91 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_connection.h 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_proxy.cpp 56 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_proxy.h 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_server.cpp 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_server.h 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_socket.cpp 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/tcp_test.cpp 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -17,7 +17,9 @@
 */
#include "center.h"
#include "center_topic_node.h"
#include "io_service.h"
#include "node_center.h"
#include "tcp_proxy.h"
#include "tcp_server.h"
#include <chrono>
@@ -74,7 +76,7 @@
    };
}
bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm)
bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy)
{
    // command
    auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
@@ -92,9 +94,41 @@
        center->OnTimer();
    };
    auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
    auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
        auto &center = *center_ptr;
        auto replyer = MakeReplyer(socket, head, center);
        if (!head.dest().ip().empty()) { // other host, proxy
            auto valid = [&]() { return head.route_size() == 1; };
            if (!valid()) { return false; }
            if (head.type() == kMsgTypeRequestTopic) {
                typedef MsgRequestTopicReply Reply;
                Reply reply;
                if (!center->CheckMsg(head, reply)) {
                    replyer(reply);
                } else {
                    auto onResult = [&center](BHMsgHead &head, std::string body_content) {
                        if (head.route_size() > 0) {
                            auto &back = head.route(head.route_size() - 1);
                            MQInfo dest = {back.mq_id(), back.abs_addr()};
                            head.mutable_route()->RemoveLast();
                            center->PassRemoteReplyToLocal(dest, head, std::move(body_content));
                        }
                    };
                    if (!tcp_proxy.Request(head.dest().ip(), head.dest().port(), msg.content(), onResult)) {
                        replyer(MakeReply<Reply>(eError, "send request failed."));
                    } else {
                        // success
                    }
                }
                return true;
            } else {
                // ignore other msgs for now.
            }
            return false;
        }
        switch (head.type()) {
            CASE_ON_MSG_TYPE(ProcInit);
            CASE_ON_MSG_TYPE(Register);
@@ -168,7 +202,10 @@
{
    auto nsec = NodeTimeoutSec();
    auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
    AddCenter(center_ptr, shm);
    io_service_.reset(new IoService);
    tcp_proxy_.reset(new TcpProxy(io_service_->io()));
    AddCenter(center_ptr, shm, *tcp_proxy_);
    for (auto &kv : Centers()) {
        auto &info = kv.second;
@@ -176,7 +213,7 @@
    }
    topic_node_.reset(new CenterTopicNode(center_ptr, shm));
    tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr));
    tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr));
}
BHCenter::~BHCenter() { Stop(); }
@@ -188,13 +225,14 @@
        sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
    }
    topic_node_->Start();
    tcp_server_->Start();
    return true;
}
bool BHCenter::Stop()
{
    tcp_server_->Stop();
    tcp_proxy_.reset();
    tcp_server_.reset();
    io_service_.reset();
    topic_node_->Stop();
    for (auto &kv : sockets_) {
        kv.second->Stop();
box/center.h
@@ -24,6 +24,8 @@
#include <memory>
class CenterTopicNode;
class TcpServer;
class TcpProxy;
class IoService;
class BHCenter
{
@@ -54,7 +56,10 @@
    std::map<std::string, std::shared_ptr<ShmSocket>> sockets_;
    std::unique_ptr<CenterTopicNode> topic_node_;
    std::unique_ptr<IoService> io_service_;
    std::unique_ptr<TcpServer> tcp_server_;
    std::unique_ptr<TcpProxy> tcp_proxy_;
};
#endif // end of include guard: CENTER_TM9OUQTG
box/center_topic_node.cpp
@@ -106,7 +106,7 @@
            *reply.mutable_errmsg() = data.errmsg();
            reply.set_data(ToJson(data));
        } else {
            SetError(*reply.mutable_errmsg(), eInvalidInput, "not supported topic" + request.topic());
            SetError(*reply.mutable_errmsg(), eInvalidInput, "invalid topic: " + request.topic());
        }
        pnode_->ServerSendReply(src_info, reply);
    };
box/io_service.cpp
@@ -16,33 +16,16 @@
 * =====================================================================================
 */
#include "io_service.h"
#include <chrono>
using namespace std::chrono_literals;
bool IoService::Start()
IoService::IoService() :
    guard_(io_.get_executor())
{
    Stop();
    bool cur = false;
    if (!run_.compare_exchange_strong(cur, true)) {
        return false;
    }
    auto proc = [this]() {
        while (run_) {
            io_.run_one_for(100ms);
        }
        OnStop();
    };
    std::thread(proc).swap(worker_);
    return true;
    std::thread([this]() { io_.run(); }).swap(worker_);
}
void IoService::Stop()
IoService::~IoService()
{
    bool cur = true;
    if (run_.compare_exchange_strong(cur, false)) {
        if (worker_.joinable()) {
            worker_.join();
        }
    }
    guard_.reset();
    io_.stop(); // normally not needed, but make sure run() exits.
    if (worker_.joinable())
        worker_.join();
}
box/io_service.h
@@ -24,19 +24,17 @@
class IoService
{
public:
    IoService() :
        run_(false) {}
    bool Start();
    void Stop();
    IoService();
    ~IoService();
    typedef boost::asio::io_context io_service_t;
    io_service_t &io() { return io_; }
private:
    virtual void OnStop() {}
    io_service_t io_;
    typedef boost::asio::executor_work_guard<io_service_t::executor_type> guard_t;
    guard_t guard_;
    std::thread worker_;
    std::atomic<bool> run_;
};
#endif // end of include guard: IO_SERVICE_ODKKJG3D
box/node_center.cpp
@@ -70,8 +70,9 @@
        return;
    }
    // LOG_FUNCTION;
    const size_t total = msgs_.size();
    time_to_clean_ = now + 1;
    int64_t limit = std::max(10000ul, msgs_.size() / 10);
    int64_t limit = std::max(10000ul, total / 10);
    int64_t n = 0;
    auto it = msgs_.begin();
    while (it != msgs_.end() && --limit > 0) {
@@ -82,16 +83,16 @@
            ++n;
        };
        int n = now - msg.timestamp();
        if (n < 10) {
        if (msg.Count() == 0) {
            Free();
        } else if (n > NodeTimeoutSec()) {
            Free();
        } else {
            ++it;
        } else if (msg.Count() == 0) {
            Free();
        } else if (n > 60) {
            Free();
        }
    }
    if (n > 0) {
        LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
        LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total;
    }
}
@@ -209,17 +210,25 @@
    RecordMsg(msg);
    return socket.Send(dest, msg);
}
bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
NodeCenter::Node NodeCenter::GetNode(const MQId mq_id)
{
    auto ssn = dest.id_ - (dest.id_ % 10);
    LOG_DEBUG() << "prox ssn " << ssn;
    Node node;
    auto ssn = mq_id - (mq_id % 10);
    auto pos = nodes_.find(ssn);
    if (pos == nodes_.end()) {
        LOG_ERROR() << "proxy msg, ssn not found.";
    if (pos != nodes_.end()) {
        node = pos->second;
    }
    return node;
}
bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
{
    Node node(GetNode(dest.id_));
    if (!node || !Valid(*node)) {
        LOG_ERROR() << id() << " pass remote request, dest not found.";
        return false;
    }
    auto &node = pos->second;
    if (!Valid(*node)) { return false; }
    ShmSocket &sender(DefaultSender(node->shm_));
    auto route = head.add_route();
@@ -233,6 +242,26 @@
    return sender.Send(dest, msg, head.msg_id(), std::move(cb));
}
bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
{
    Node node(GetNode(dest.id_));
    if (!node) {
        LOG_ERROR() << id() << " pass remote reply , ssn not found.";
        return false;
    }
    auto offset = node->addrs_[dest.id_];
    if (offset != dest.offset_) {
        LOG_ERROR() << id() << " pass remote reply, dest address not match";
        return false;
    }
    ShmMsg msg(node->shm_);
    if (!msg.Make(head, body_content)) { return false; }
    DEFER1(msg.Release(););
    RecordMsg(msg);
    return DefaultSender(node->shm_).Send(dest, msg);
}
void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
{
    // LOG_FUNCTION;
box/node_center.h
@@ -122,7 +122,8 @@
    void RecordMsg(const MsgI &msg);
    bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
    bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
    bool ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
    bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
    bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
    void OnAlloc(ShmSocket &socket, const int64_t val);
    void OnFree(ShmSocket &socket, const int64_t val);
    bool OnCommand(ShmSocket &socket, const int64_t val);
@@ -159,6 +160,14 @@
    {
        return HandleMsg<MsgCommonReply, Func>(head, op);
    }
    template <class Reply>
    bool CheckMsg(const BHMsgHead &head, Reply &reply)
    {
        bool r = false;
        auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); };
        reply = HandleMsg<Reply>(head, onOk);
        return r;
    }
    MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg);
    MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
@@ -184,6 +193,8 @@
        return node && Valid(*node);
    }
    void RemoveNode(Node &node);
    Node GetNode(const MQId mq);
    std::string id_; // center proc id;
    std::unordered_map<Topic, Clients> service_map_;
box/tcp_common.h
@@ -21,10 +21,18 @@
#include <boost/asio.hpp>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <functional>
#include <string>
namespace ip = boost::asio::ip;
using boost::asio::ip::tcp;
typedef boost::system::error_code bserror_t;
const boost::uuids::uuid kBHTcpServerTag = boost::uuids::string_generator()("e5bff527-6bf8-ee0d-cd28-b36594acee39");
namespace bhome_msg
{
class BHMsgHead;
}
typedef std::function<void(bhome_msg::BHMsgHead &head, std::string body_content)> ReplyCB;
#endif // end of include guard: TCP_COMMON_8S8O7OV
box/tcp_connection.cpp
@@ -19,6 +19,7 @@
#include "log.h"
#include "msg.h"
#include "node_center.h"
#include "proto.h"
#include "shm_socket.h"
namespace
@@ -59,10 +60,8 @@
bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content)
{
    const char *p = buffer.data();
    LOG_DEBUG() << "msg len " << len;
    if (4 > len) { return false; }
    uint32_t head_len = Get32(p);
    LOG_DEBUG() << "head_len " << head_len;
    if (head_len > 1024 * 4) {
        throw std::runtime_error("unexpected tcp reply data.");
    }
@@ -87,15 +86,34 @@
/// request -----------------------------------------------------------
void TcpRequest1::SendReply(BHMsgHead &head, std::string body_content)
{
    if (reply_cb_) {
        reply_cb_(head, std::move(body_content));
    }
}
void TcpRequest1::OnError(bserror_t ec)
{
    LOG_ERROR() << "tcp client error: " << ec;
    // LOG_ERROR() << "tcp client error: " << ec << ", " << ec.message();
    BHMsgHead head;
    std::string body_content;
    try {
        std::vector<char> req(request_.begin(), request_.end());
        if (CheckData(req, req.size(), head, body_content)) {
            if (head.type() == kMsgTypeRequestTopic) {
                SendReply(head, MakeReply<MsgRequestTopicReply>(eError, std::to_string(ec.value()) + ',' + ec.message()).SerializeAsString());
            }
        }
    } catch (std::exception &e) {
    }
    Close();
}
void TcpRequest1::Start()
{
    auto readReply = [this]() {
        // if (!reply_cb_) { return; } // no reply needed, maybe safe to close?
        recv_buffer_.resize(1000);
        recv_len_ = 0;
        socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
@@ -104,47 +122,22 @@
    socket_.async_connect(remote_, TcpCB(*this, request));
}
void TcpRequest1::Close()
{
    LOG_DEBUG() << "client close";
    socket_.close();
}
void TcpRequest1::Close() { socket_.close(); }
void TcpRequest1::OnRead(size_t size)
{
    LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_;
    recv_len_ += size;
    BHMsgHead head;
    std::string body_content;
    bool recv_done = false;
    try {
        recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
        if (CheckData(recv_buffer_, recv_len_, head, body_content)) { // got reply.
            Close();
            SendReply(head, std::move(body_content));
        } else { // not complete, read again
            socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
        }
    } catch (std::exception &e) {
        LOG_ERROR() << e.what();
        Close();
        return;
    }
    if (recv_done) {
        // just pass to client, no check, client will check it anyway.
        LOG_DEBUG() << "route size: " << head.route_size();
        if (head.route_size() < 1) { return; }
        auto &back = head.route(head.route_size() - 1);
        MQInfo dest = {back.mq_id(), back.abs_addr()};
        head.mutable_route()->RemoveLast();
        LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_;
        MsgRequestTopicReply reply;
        if (reply.ParseFromString(body_content)) {
            LOG_DEBUG() << "err msg: " << reply.errmsg().errstring();
            LOG_DEBUG() << "content : " << reply.data();
        }
        Close();
        return;
        shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size()));
    } else { // read again
        LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
        socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
    }
}
@@ -153,7 +146,7 @@
void TcpReply1::OnError(bserror_t ec) { Close(); }
void TcpReply1::Close()
{
    LOG_DEBUG() << "server close.";
    LOG_TRACE() << "server close.";
    socket_.close();
}
@@ -177,38 +170,26 @@
        return;
    }
    auto ParseBody = [&](auto &req) {
        const char *p = recv_buffer_.data();
        uint32_t size = Get32(p);
        p += 4;
        p += size;
        size = Get32(p);
        p += 4;
        return req.ParseFromArray(p, size);
    };
    if (recv_done) {
        LOG_DEBUG() << "request data: " << size;
        auto self(shared_from_this());
        LOG_TRACE() << "tcp server recv request data, size: " << size;
        MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
        if (remote.id_ && remote.offset_) {
            auto self(shared_from_this());
            auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
                send_buffer_ = imsg.content();
                async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
            };
            auto &scenter = *pscenter_;
            if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) {
                send_buffer_ = "fake reply";
                async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
            if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
                return;
            }
        } else {
            LOG_DEBUG() << "no address";
            send_buffer_ = "no address";
            async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
        }
        Close();
    } else { // read again
        LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
    } else { // not complete, read again
        LOG_TRACE() << "not complete, read again " << recv_buffer_.size();
        socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
    }
};
box/tcp_connection.h
@@ -25,25 +25,28 @@
#include <memory>
class ShmSocket;
class NodeCenter;
typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1>
{
public:
    static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket)
    static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ReplyCB const &cb)
    {
        std::make_shared<TcpRequest1>(io, addr, std::move(request), shm_socket)->Start();
        std::make_shared<TcpRequest1>(io, addr, std::move(request), cb)->Start();
    }
    TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) :
        socket_(io), shm_socket_(shm_socket), remote_(addr), request_(std::move(request)) {}
    TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ReplyCB const &cb) :
        socket_(io), reply_cb_(cb), remote_(addr), request_(std::move(request)) {}
    void OnError(bserror_t ec);
private:
    void Start();
    void Close();
    void OnRead(size_t size);
    void SendReply(BHMsgHead &head, std::string body_content);
    tcp::socket socket_;
    ShmSocket &shm_socket_; // send reply
    ReplyCB reply_cb_;
    tcp::endpoint remote_;
    std::string request_;
    std::vector<char> recv_buffer_;
@@ -54,7 +57,6 @@
class TcpReply1 : public std::enable_shared_from_this<TcpReply1>
{
public:
    typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
    static void Create(tcp::socket sock, CenterPtr pscenter)
    {
        std::make_shared<TcpReply1>(std::move(sock), pscenter)->Start();
box/tcp_proxy.cpp
@@ -16,54 +16,18 @@
 * =====================================================================================
 */
#include "tcp_proxy.h"
#include "defs.h"
#include "shm_socket.h"
#include "tcp_connection.h"
TcpProxy::TcpProxy() :
    run_(false) {}
TcpProxy::~TcpProxy() {}
bool TcpProxy::Start(bhome_shm::SharedMemory &shm)
{
    Stop();
    bool cur = false;
    if (!run_.compare_exchange_strong(cur, true)) { return false; }
    auto &mq = GetCenterInfo(shm)->mq_tcp_proxy_;
    local_.reset(new ShmSocket(mq.offset_, shm, mq.id_));
    auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
        auto &dest = head.dest();
        if (dest.ip().empty() || dest.port() == 0) { return; }
        Request(dest.ip(), dest.port(), msg.content());
    };
    local_->Start(1, localProc);
    auto proxyProc = [this]() {
        while (run_) {
            io_context_.run_one_for(std::chrono::milliseconds(100));
        }
    };
    std::thread(proxyProc).swap(worker_);
    return true;
}
void TcpProxy::Stop()
{
    bool cur = true;
    if (run_.compare_exchange_strong(cur, false)) {
        if (worker_.joinable()) {
            worker_.join();
        }
        local_.reset();
    }
}
bool TcpProxy::Request(const std::string &ip, int port, std::string &&content)
bool TcpProxy::Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb)
{
    if (content.empty()) { return false; }
    tcp::endpoint dest(ip::address::from_string(ip), port);
    TcpRequest1::Create(io_context_, dest, std::move(content), *local_);
    try {
        tcp::endpoint dest(ip::address::from_string(ip), port);
        TcpRequest1::Create(io_, dest, std::move(content), cb);
        LOG_TRACE() << "tcp request start " << ip << ':' << port;
        return true;
    } catch (std::exception &e) {
        LOG_ERROR() << "proxy request exception: " << e.what();
        return false;
    }
}
box/tcp_proxy.h
@@ -18,28 +18,24 @@
#ifndef TCP_PROXY_E1YJ92U5
#define TCP_PROXY_E1YJ92U5
#include "shm.h"
#include "bh_util.h"
#include "io_service.h"
#include "tcp_common.h"
#include <atomic>
#include <thread>
#include <memory>
class ShmSocket;
class NodeCenter;
typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
class TcpProxy
{
public:
    TcpProxy();
    ~TcpProxy();
    bool Start(bhome_shm::SharedMemory &shm);
    void Stop();
    typedef IoService::io_service_t io_service_t;
    TcpProxy(io_service_t &io) :
        io_(io) {}
    bool Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb);
private:
    bool Request(const std::string &ip, int port, std::string &&content);
    std::unique_ptr<ShmSocket> local_;
    boost::asio::io_context io_context_;
    std::thread worker_;
    std::atomic<bool> run_;
    io_service_t &io_;
};
#endif // end of include guard: TCP_PROXY_E1YJ92U5
box/tcp_server.cpp
@@ -23,26 +23,19 @@
using namespace std::chrono_literals;
TcpServer::TcpServer(int port, CenterPtr pscenter) :
    listener_(io(), tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter)
{
    Accept();
}
TcpServer::~TcpServer() { Stop(); }
void TcpServer::OnStop()
{
    listener_.close();
}
void TcpServer::Accept()
{
    listener_.async_accept([this](bserror_t ec, tcp::socket sock) {
        if (!ec) {
            LOG_INFO() << "server accept client";
            LOG_TRACE() << "server accept client";
            TcpReply1::Create(std::move(sock), pscenter_);
            Accept();
        } else {
            // this is already destructed by now.
            if (ec.value() != ECANCELED) {
                LOG_WARNING() << "tcp server accept error: " << ec;
                Accept();
            }
        }
        Accept();
    });
}
box/tcp_server.h
@@ -23,15 +23,17 @@
#include "tcp_common.h"
class NodeCenter;
class TcpServer : public IoService
class TcpServer
{
public:
    typedef IoService::io_service_t io_service_t;
    typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
    TcpServer(int port, CenterPtr pscenter);
    ~TcpServer();
    TcpServer(io_service_t &io, int port, CenterPtr pscenter) :
        io_(io), listener_(io_, tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter) { Accept(); }
    ~TcpServer() { listener_.close(); }
private:
    virtual void OnStop();
    io_service_t &io_;
    void Accept();
    tcp::acceptor listener_;
    CenterPtr pscenter_;
src/msg.cpp
@@ -37,7 +37,8 @@
            Free();
        }
    } else if (n < 0) {
        LOG_FATAL() << "error double release data.";
        // ns_log::GetTrace();
        LOG_FATAL() << "double release msg.";
        throw std::runtime_error("double release msg.");
    }
    return n;
src/shm_socket.cpp
@@ -171,8 +171,7 @@
            RecvCB cb_no_use;
            per_msg_cbs_->Pick(msg_id, cb_no_use);
        };
        SendImpl(remote, msg, onExpireRemoveCB);
        return true;
        return SendImpl(remote, msg, onExpireRemoveCB);
    } catch (std::exception &e) {
        SetLastError(eError, "Send internal error.");
        return false;
src/topic_node.cpp
@@ -497,9 +497,10 @@
    out_msg_id = msg_id;
    auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
    auto SendTo = [this, remote_addr, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
        auto &sock = SockClient();
        BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id));
        *head.mutable_dest() = remote_addr;
        AddRoute(head, sock);
        head.set_topic(req.topic());
@@ -519,8 +520,12 @@
    };
    try {
        BHAddress addr;
        return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
        if (remote_addr.ip().empty()) {
            BHAddress addr;
            return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
        } else {
            return SendTo(CenterAddr(), req, cb);
        }
    } catch (...) {
        SetLastError(eError, "internal error.");
        return false;
@@ -536,25 +541,34 @@
    try {
        auto &sock = SockClient();
        BHAddress addr;
        if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
            LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id();
            BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
            AddRoute(head, sock);
            head.set_topic(request.topic());
            MsgI reply_msg(shm());
            DEFER1(reply_msg.Release(););
            BHMsgHead reply_head;
            if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) &&
                reply_head.type() == kMsgTypeRequestTopicReply &&
                reply_msg.ParseBody(out_reply)) {
                reply_head.mutable_proc_id()->swap(out_proc_id);
                return true;
        MQInfo dest;
        if (!remote_addr.ip().empty()) {
            dest = CenterAddr();
        } else {
            BHAddress addr;
            if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
                dest.offset_ = addr.abs_addr();
                dest.id_ = addr.mq_id();
            } else {
                return false;
            }
        }
        BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
        *head.mutable_dest() = remote_addr;
        AddRoute(head, sock);
        head.set_topic(request.topic());
        MsgI reply_msg(shm());
        DEFER1(reply_msg.Release(););
        BHMsgHead reply_head;
        if (sock.SendAndRecv(dest, head, request, reply_msg, reply_head, timeout_ms) &&
            reply_head.type() == kMsgTypeRequestTopicReply &&
            reply_msg.ParseBody(out_reply)) {
            reply_head.mutable_proc_id()->swap(out_proc_id);
            return true;
        }
    } catch (...) {
        SetLastError(eError, __func__ + std::string(" internal errer."));
    }
utest/api_test.cpp
@@ -206,7 +206,7 @@
        }
        printf("\n");
    };
    {
    if (0) {
        // query procs
        std::string dest(BHAddress().SerializeAsString());
        MsgQueryProc query;
@@ -224,14 +224,21 @@
        // printf("register topic : %s\n", r ? "ok" : "failed");
        // Sleep(1s);
    }
    {
    for (int i = 0; i < 3; ++i) {
        // query procs with normal topic request
        MsgRequestTopic req;
        req.set_topic("#center_query_procs");
        // req.set_data("{\"proc_id\":\"#center.node\"}");
        std::string s(req.SerializeAsString());
        // Sleep(10ms, false);
        std::string dest(BHAddress().SerializeAsString());
        BHAddress host;
        printf("query with ip set\n");
        host.set_ip("127.0.0.1");
        host.set_port(kBHCenterPort);
        host.set_mq_id(1000011);
        host.set_abs_addr(10296);
        std::string dest(host.SerializeAsString());
        void *proc_id = 0;
        int proc_id_len = 0;
        DEFER1(BHFree(proc_id, proc_id_len););
@@ -247,7 +254,7 @@
        } else {
            MsgRequestTopicReply ret;
            ret.ParseFromArray(reply, reply_len);
            printf("topic query proc : %s\n", ret.data().c_str());
            printf("\ntopic query proc : %s\n", ret.data().c_str());
            // MsgQueryProcReply result;
            // if (result.ParseFromArray(ret.data().data(), ret.data().size()) && IsSuccess(result.errmsg().errcode())) {
            //     PrintProcs(result);
@@ -325,7 +332,7 @@
        for (int i = 0; i < 1; ++i) {
            MsgPublish pub;
            pub.set_topic(topic_ + std::to_string(i));
            pub.set_data("pub_data_" + std::string(1024 * 1, 'a'));
            pub.set_data("pub_data_" + std::string(104 * 1, 'a'));
            std::string s(pub.SerializeAsString());
            BHPublish(s.data(), s.size(), 0);
            // Sleep(1s);
utest/tcp_test.cpp
@@ -19,6 +19,7 @@
#include "defs.h"
#include "node_center.h"
#include "tcp_connection.h"
#include "tcp_proxy.h"
#include "tcp_server.h"
#include "util.h"
#include <sys/ioctl.h>
@@ -33,34 +34,54 @@
BOOST_AUTO_TEST_CASE(TcpTest)
{
    SharedMemory &shm = TestShm();
    const std::string connect_addr = "127.0.0.1";
    const uint16_t port = kBHCenterPort;
    boost::asio::io_context io;
    IoService io;
    tcp::endpoint dest(ip::address::from_string(connect_addr), port);
    MsgRequestTopic req;
    req.set_topic("#center_query_procs");
    req.set_data("");
    auto head = InitMsgHead(GetType(req), "#test_proc", 1000000);
    auto route = head.add_route();
    route->set_mq_id(12345);
    route->set_abs_addr(67890);
    head.mutable_dest()->set_ip(connect_addr);
    head.mutable_dest()->set_port(port);
    head.mutable_dest()->set_mq_id(1000011);
    head.mutable_dest()->set_abs_addr(10296);
    auto NewRequest = [&]() {
        MsgRequestTopic req;
        req.set_topic("#center_query_procs");
        req.set_data("");
        auto head = InitMsgHead(GetType(req), "#test_proc", 1000000);
        auto route = head.add_route();
        route->set_mq_id(12345);
        route->set_abs_addr(67890);
    auto request(MsgI::Serialize(head, req));
    for (int i = 0; i < 1; ++i) {
        LOG_DEBUG() << "request size: " << request.size();
        TcpRequest1::Create(io, dest, request, DefaultSender(BHomeShm()));
        head.mutable_dest()->set_ip(connect_addr);
        head.mutable_dest()->set_port(port);
        head.mutable_dest()->set_mq_id(1000011);
        head.mutable_dest()->set_abs_addr(10296);
        return (MsgI::Serialize(head, req));
    };
    auto onReply = [](BHMsgHead &head, std::string body_content) {
        static int n = 0;
        printf("reply %d: ", ++n);
        MsgRequestTopicReply reply;
        if (reply.ParseFromString(body_content)) {
            if (IsSuccess(reply.errmsg().errcode())) {
                printf("\ncontent: %s\n", reply.data().c_str());
            } else {
                printf("error: %s\n", reply.errmsg().errstring().c_str());
            }
        } else {
            printf("parse error\n");
        }
    };
    for (int i = 0; i < 100; ++i) {
        auto request = NewRequest();
        TcpRequest1::Create(io.io(), dest, request, onReply);
    }
    io.run();
    Sleep(2s);
    printf("-------------------------------------------------------\n");
    for (int i = 0; i < 3; ++i) {
        auto request = NewRequest();
        TcpRequest1::Create(io.io(), dest, request, onReply);
    }
    Sleep(2s);
    printf("TcpTest\n");
}