lichao
2021-05-27 026bbfaf2b5d73a26b8e2fa49158883ef64c211b
tcp server call center to send proxy requests.
2个文件已添加
14个文件已修改
609 ■■■■ 已修改文件
box/center.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/io_service.cpp 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/io_service.h 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.cpp 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_connection.cpp 228 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_connection.h 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_proxy.cpp 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_proxy.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_server.cpp 32 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_server.h 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 47 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_socket.cpp 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_socket.h 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/tcp_test.cpp 29 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -18,6 +18,7 @@
#include "center.h"
#include "center_topic_node.h"
#include "node_center.h"
#include "tcp_server.h"
#include <chrono>
using namespace std::chrono;
@@ -175,6 +176,7 @@
    }
    topic_node_.reset(new CenterTopicNode(center_ptr, shm));
    tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr));
}
BHCenter::~BHCenter() { Stop(); }
@@ -186,11 +188,13 @@
        sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
    }
    topic_node_->Start();
    tcp_server_->Start();
    return true;
}
bool BHCenter::Stop()
{
    tcp_server_->Stop();
    topic_node_->Stop();
    for (auto &kv : sockets_) {
        kv.second->Stop();
box/center.h
@@ -23,6 +23,7 @@
#include <map>
#include <memory>
class CenterTopicNode;
class TcpServer;
class BHCenter
{
@@ -53,6 +54,7 @@
    std::map<std::string, std::shared_ptr<ShmSocket>> sockets_;
    std::unique_ptr<CenterTopicNode> topic_node_;
    std::unique_ptr<TcpServer> tcp_server_;
};
#endif // end of include guard: CENTER_TM9OUQTG
box/io_service.cpp
New file
@@ -0,0 +1,48 @@
/*
 * =====================================================================================
 *
 *       Filename:  io_service.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年05月27日 13时25分18秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "io_service.h"
#include <chrono>
using namespace std::chrono_literals;
bool IoService::Start()
{
    Stop();
    bool cur = false;
    if (!run_.compare_exchange_strong(cur, true)) {
        return false;
    }
    auto proc = [this]() {
        while (run_) {
            io_.run_one_for(100ms);
        }
        OnStop();
    };
    std::thread(proc).swap(worker_);
    return true;
}
void IoService::Stop()
{
    bool cur = true;
    if (run_.compare_exchange_strong(cur, false)) {
        if (worker_.joinable()) {
            worker_.join();
        }
    }
}
box/io_service.h
New file
@@ -0,0 +1,42 @@
/*
 * =====================================================================================
 *
 *       Filename:  io_service.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年05月27日 13时25分37秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef IO_SERVICE_ODKKJG3D
#define IO_SERVICE_ODKKJG3D
#include <boost/asio.hpp>
#include <thread>
class IoService
{
public:
    IoService() :
        run_(false) {}
    bool Start();
    void Stop();
    typedef boost::asio::io_context io_service_t;
    io_service_t &io() { return io_; }
private:
    virtual void OnStop() {}
    io_service_t io_;
    std::thread worker_;
    std::atomic<bool> run_;
};
#endif // end of include guard: IO_SERVICE_ODKKJG3D
box/node_center.cpp
@@ -209,6 +209,29 @@
    RecordMsg(msg);
    return socket.Send(dest, msg);
}
bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
{
    auto ssn = dest.id_ - (dest.id_ % 10);
    LOG_DEBUG() << "prox ssn " << ssn;
    auto pos = nodes_.find(ssn);
    if (pos == nodes_.end()) {
        LOG_ERROR() << "proxy msg, ssn not found.";
        return false;
    }
    auto &node = pos->second;
    if (!Valid(*node)) { return false; }
    ShmSocket &sender(DefaultSender(node->shm_));
    auto route = head.add_route();
    route->set_mq_id(sender.id());
    route->set_abs_addr(sender.AbsAddr());
    ShmMsg msg(node->shm_);
    if (!msg.Make(head, body_content)) { return false; }
    DEFER1(msg.Release(););
    RecordMsg(msg);
    return sender.Send(dest, msg, head.msg_id(), std::move(cb));
}
void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
{
box/node_center.h
@@ -122,6 +122,7 @@
    void RecordMsg(const MsgI &msg);
    bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
    bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
    bool ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
    void OnAlloc(ShmSocket &socket, const int64_t val);
    void OnFree(ShmSocket &socket, const int64_t val);
    bool OnCommand(ShmSocket &socket, const int64_t val);
box/tcp_connection.cpp
@@ -17,78 +17,198 @@
 */
#include "tcp_connection.h"
#include "log.h"
#include "msg.h"
#include "node_center.h"
#include "shm_socket.h"
namespace
{
template <class C>
auto Buffer(C &c) { return boost::asio::buffer(c.data(), c.size()); }
auto Buffer(C &c, size_t offset = 0) { return boost::asio::buffer(c.data() + offset, c.size() - offset); }
using boost::asio::async_read;
using boost::asio::async_write;
typedef std::function<void()> VoidHandler;
typedef std::function<void(size_t)> SizeHandler;
template <class T, class... Param>
auto TcpCallback(T &conn, std::function<void(Param...)> const &func)
{
    auto self(conn.shared_from_this());
    return [self, func](bserror_t ec, Param... size) {
        if (!ec) {
            func(size...);
        } else {
            self->OnError(ec);
        }
    };
}
template <class T>
auto TcpCB(T &conn, VoidHandler const &func) { return TcpCallback(conn, func); }
template <class T>
auto TcpCBSize(T &conn, SizeHandler const &func) { return TcpCallback(conn, func); }
template <class T>
auto TcpCBSize(T &conn, VoidHandler const &func)
{
    return TcpCBSize(conn, [func](size_t) { func(); });
}
bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content)
{
    const char *p = buffer.data();
    LOG_DEBUG() << "msg len " << len;
    if (4 > len) { return false; }
    uint32_t head_len = Get32(p);
    LOG_DEBUG() << "head_len " << head_len;
    if (head_len > 1024 * 4) {
        throw std::runtime_error("unexpected tcp reply data.");
    }
    auto before_body = 4 + head_len + 4;
    if (before_body > len) {
        if (before_body > buffer.size()) {
            buffer.resize(before_body);
        }
        return false;
    }
    if (!head.ParseFromArray(p + 4, head_len)) {
        throw std::runtime_error("tcp recv invalid reply head.");
    }
    uint32_t body_len = Get32(p + 4 + head_len);
    buffer.resize(before_body + body_len);
    if (buffer.size() > len) { return false; }
    body_content.assign(p + before_body, body_len);
    return true;
}
} // namespace
TcpRequest1::TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) :
    socket_(io), remote_(addr), request_(std::move(request)) {}
void TcpRequest1::Connect()
/// request -----------------------------------------------------------
void TcpRequest1::OnError(bserror_t ec)
{
    auto self = shared_from_this();
    socket_.async_connect(remote_, [this, self](bserror_t ec) {
        if (!ec) {
            SendRequest();
        } else {
            LOG_ERROR() << "connect error " << ec;
            Close();
        }
    });
    LOG_ERROR() << "tcp client error: " << ec;
    Close();
}
void TcpRequest1::Start()
{
    auto readReply = [this]() {
        recv_buffer_.resize(1000);
        recv_len_ = 0;
        socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
    };
    auto request = [this, readReply]() { async_write(socket_, Buffer(request_), TcpCBSize(*this, readReply)); };
    socket_.async_connect(remote_, TcpCB(*this, request));
}
void TcpRequest1::Close()
{
    LOG_DEBUG() << "client close";
    socket_.close();
}
void TcpRequest1::OnRead(size_t size)
{
    LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_;
    recv_len_ += size;
    BHMsgHead head;
    std::string body_content;
    bool recv_done = false;
    try {
        recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
    } catch (std::exception &e) {
        LOG_ERROR() << e.what();
        Close();
        return;
    }
    if (recv_done) {
        // just pass to client, no check, client will check it anyway.
        LOG_DEBUG() << "route size: " << head.route_size();
        if (head.route_size() < 1) { return; }
        auto &back = head.route(head.route_size() - 1);
        MQInfo dest = {back.mq_id(), back.abs_addr()};
        head.mutable_route()->RemoveLast();
        LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_;
        MsgRequestTopicReply reply;
        if (reply.ParseFromString(body_content)) {
            LOG_DEBUG() << "err msg: " << reply.errmsg().errstring();
            LOG_DEBUG() << "content : " << reply.data();
        }
        Close();
        return;
        shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size()));
    } else { // read again
        LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
        socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
    }
}
/// reply --------------------------------------------------------------
void TcpReply1::OnError(bserror_t ec) { Close(); }
void TcpReply1::Close()
{
    LOG_DEBUG() << "server close.";
    socket_.close();
}
void TcpRequest1::SendRequest()
{
    LOG_INFO() << "client sending request " << request_;
    auto self = shared_from_this();
    async_write(socket_, Buffer(request_), [this, self](bserror_t ec, size_t) {
        if (!ec) {
            ReadReply();
        } else {
            Close();
        }
    });
}
void TcpRequest1::ReadReply()
{
    buffer_.resize(1000);
    auto self = shared_from_this();
    socket_.async_read_some(Buffer(buffer_), [this, self](bserror_t ec, size_t size) {
        if (!ec) {
            printf("reply data: %s\n", buffer_.data());
        } else {
            Close();
        }
    });
}
TcpReply1::TcpReply1(tcp::socket sock) :
    socket_(std::move(sock)) {}
void TcpReply1::Start()
{
    LOG_INFO() << "server session reading...";
    recv_buffer_.resize(1000);
    auto self(shared_from_this());
    socket_.async_read_some(Buffer(recv_buffer_), [this, self](bserror_t ec, size_t size) {
        LOG_INFO() << "server read : " << recv_buffer_.data();
        // fake reply
        if (!ec) {
            send_buffer_ = std::string(recv_buffer_.data(), size) + " reply";
            async_write(socket_, Buffer(send_buffer_), [this, self](bserror_t ec, size_t size) {
                socket_.close();
            });
    socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
}
void TcpReply1::OnRead(size_t size)
{
    recv_len_ += size;
    BHMsgHead head;
    std::string body_content;
    bool recv_done = false;
    try {
        recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
    } catch (std::exception &e) {
        LOG_ERROR() << e.what();
        Close();
        return;
    }
    auto ParseBody = [&](auto &req) {
        const char *p = recv_buffer_.data();
        uint32_t size = Get32(p);
        p += 4;
        p += size;
        size = Get32(p);
        p += 4;
        return req.ParseFromArray(p, size);
    };
    if (recv_done) {
        LOG_DEBUG() << "request data: " << size;
        auto self(shared_from_this());
        MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
        if (remote.id_ && remote.offset_) {
            auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
                send_buffer_ = imsg.content();
                async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
            };
            auto &scenter = *pscenter_;
            if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) {
                send_buffer_ = "fake reply";
                async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
            }
        } else {
            socket_.close();
            LOG_DEBUG() << "no address";
            send_buffer_ = "no address";
            async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
        }
    });
}
    } else { // read again
        LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
        socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
    }
};
box/tcp_connection.h
@@ -18,46 +18,61 @@
#ifndef TCP_CONNECTION_H373GIL5
#define TCP_CONNECTION_H373GIL5
#include "bh_util.h"
#include "node_center.h"
#include "tcp_common.h"
#include <functional>
#include <memory>
class ShmSocket;
class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1>
{
public:
    static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request)
    static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket)
    {
        std::make_shared<TcpRequest1>(io, addr, std::move(request))->Connect();
        std::make_shared<TcpRequest1>(io, addr, std::move(request), shm_socket)->Start();
    }
    TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request);
    TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) :
        socket_(io), shm_socket_(shm_socket), remote_(addr), request_(std::move(request)) {}
    void OnError(bserror_t ec);
private:
    void Connect();
    void Start();
    void Close();
    void SendRequest();
    void ReadReply();
    void OnRead(size_t size);
    tcp::socket socket_;
    ShmSocket &shm_socket_; // send reply
    tcp::endpoint remote_;
    std::string request_;
    std::vector<char> buffer_;
    std::vector<char> recv_buffer_;
    size_t recv_len_ = 0;
};
class NodeCenter;
class TcpReply1 : public std::enable_shared_from_this<TcpReply1>
{
public:
    static void Create(tcp::socket sock)
    typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
    static void Create(tcp::socket sock, CenterPtr pscenter)
    {
        std::make_shared<TcpReply1>(std::move(sock))->Start();
        std::make_shared<TcpReply1>(std::move(sock), pscenter)->Start();
    }
    TcpReply1(tcp::socket sock);
    void Start();
    TcpReply1(tcp::socket sock, CenterPtr pscenter) :
        socket_(std::move(sock)), pscenter_(pscenter) {}
    void OnError(bserror_t ec);
private:
    void Start();
    void Close();
    void OnRead(size_t size);
    tcp::socket socket_;
    CenterPtr pscenter_;
    std::vector<char> recv_buffer_;
    uint32_t recv_len_ = 0;
    std::string send_buffer_;
};
box/tcp_proxy.cpp
@@ -36,8 +36,7 @@
    auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
        auto &dest = head.dest();
        if (dest.ip().empty() || dest.port() == 0) { return; }
        bool r = Send(dest.ip(), dest.port(), msg.content());
        // TODO check send fail.
        Request(dest.ip(), dest.port(), msg.content());
    };
    local_->Start(1, localProc);
@@ -46,34 +45,25 @@
            io_context_.run_one_for(std::chrono::milliseconds(100));
        }
    };
    std::thread(proxyProc).swap(worker_);
    return true;
}
void TcpProxy::Stop()
{
    local_.reset();
    bool cur = true;
    if (run_.compare_exchange_strong(cur, false)) {
        if (worker_.joinable()) {
            worker_.join();
        }
        local_.reset();
    }
}
bool TcpProxy::Send(const std::string &ip, int port, std::string &&content)
bool TcpProxy::Request(const std::string &ip, int port, std::string &&content)
{
    if (content.empty()) { return false; }
    tcp::endpoint dest(ip::address::from_string(ip), port);
    TcpRequest1::Create(io_context_, dest, std::move(content));
    // char tag[sizeof(kBHTcpServerTag)] = {0};
    // int n = read(sock, tag, sizeof(tag));
    // if (n == sizeof(tag) && memcmp(tag, &kBHTcpServerTag, sizeof(tag)) == 0) {
    //     send(sock, content.data(), content.size(), 0);
    //     connections_[addr].io_info_.h_ = [this, sock](int events) { OnReply(sock); };
    //     // success
    // }
    TcpRequest1::Create(io_context_, dest, std::move(content), *local_);
}
box/tcp_proxy.h
@@ -34,7 +34,7 @@
    void Stop();
private:
    bool Send(const std::string &ip, int port, std::string &&content);
    bool Request(const std::string &ip, int port, std::string &&content);
    std::unique_ptr<ShmSocket> local_;
    boost::asio::io_context io_context_;
box/tcp_server.cpp
@@ -23,39 +23,17 @@
using namespace std::chrono_literals;
TcpServer::TcpServer(int port) :
    run_(false), listener_(io_, tcp::endpoint(tcp::v6(), port))
TcpServer::TcpServer(int port, CenterPtr pscenter) :
    listener_(io(), tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter)
{
    Accept();
}
TcpServer::~TcpServer() { Stop(); }
bool TcpServer::Start()
void TcpServer::OnStop()
{
    Stop();
    bool cur = false;
    if (run_.compare_exchange_strong(cur, true)) {
        auto proc = [this]() {
            while (run_) {
                io_.run_one_for(100ms);
            }
        };
        std::thread(proc).swap(worker_);
    }
}
void TcpServer::Stop()
{
    bool cur = true;
    if (run_.compare_exchange_strong(cur, false)) {
        io_.post([this]() {
            listener_.close();
        });
        std::this_thread::sleep_for(1s);
        if (worker_.joinable()) {
            worker_.join();
        }
    }
    listener_.close();
}
void TcpServer::Accept()
@@ -63,7 +41,7 @@
    listener_.async_accept([this](bserror_t ec, tcp::socket sock) {
        if (!ec) {
            LOG_INFO() << "server accept client";
            TcpReply1::Create(std::move(sock));
            TcpReply1::Create(std::move(sock), pscenter_);
        }
        Accept();
    });
box/tcp_server.h
@@ -18,23 +18,23 @@
#ifndef TCP_SERVER_795VXR94
#define TCP_SERVER_795VXR94
#include "bh_util.h"
#include "io_service.h"
#include "tcp_common.h"
#include <thread>
class TcpServer
class NodeCenter;
class TcpServer : public IoService
{
public:
    explicit TcpServer(int port);
    typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
    TcpServer(int port, CenterPtr pscenter);
    ~TcpServer();
    bool Start();
    void Stop();
private:
    virtual void OnStop();
    void Accept();
    std::thread worker_;
    std::atomic<bool> run_;
    boost::asio::io_context io_;
    tcp::acceptor listener_;
    CenterPtr pscenter_;
};
#endif // end of include guard: TCP_SERVER_795VXR94
src/msg.h
@@ -75,12 +75,16 @@
    };
    OffsetType offset_;
    SharedMemory *pshm_;
    void *Alloc(const size_t size)
    void *Alloc(const size_t size, const void *src = nullptr)
    {
        void *p = shm().Alloc(sizeof(Meta) + size);
        if (p) {
            auto pmeta = new (p) Meta(size);
            p = pmeta + 1;
            if (src) {
                memcpy(p, src, size);
            }
        }
        return p;
    }
@@ -108,16 +112,35 @@
        }
        return addr;
    }
    void *Pack(const std::string &content)
    void *Pack(const BHMsgHead &head, const uint32_t head_len, const std::string &body_content)
    {
        void *addr = get();
        if (addr) {
            memcpy(addr, content.data(), content.size());
            meta()->size_ = content.size();
            auto p = static_cast<char *>(addr);
            auto Pack1 = [&p](uint32_t len, auto &&writer) {
                Put32(p, len);
                p += sizeof(len);
                writer(p, len);
                p += len;
            };
            Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); });
            Pack1(body_content.size(), [&](void *p, int len) { memcpy(p, body_content.data(), len); });
            meta()->size_ = 4 + head_len + 4 + body_content.size();
        }
        return addr;
    }
    void *Pack(const void *src, const size_t size)
    {
        void *addr = get();
        if (addr && src) {
            memcpy(addr, src, size);
            meta()->size_ = size;
        }
        return addr;
    }
    void *Pack(const std::string &content) { return Pack(content.data(), content.size()); }
    bool Make(void *addr)
    {
@@ -166,6 +189,13 @@
        uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
        return Make(size) && Pack(head, head_len, body, body_len);
    }
    inline bool Make(const BHMsgHead &head, const std::string &body_content)
    {
        uint32_t head_len = head.ByteSizeLong();
        uint32_t size = sizeof(head_len) + head_len + sizeof(uint32_t) + body_content.size();
        return Make(size) && Pack(head, head_len, body_content);
    }
    template <class Body>
    inline bool Fill(const BHMsgHead &head, const Body &body)
    {
@@ -175,8 +205,11 @@
        return valid() && (meta()->capacity_ >= size) && Pack(head, head_len, body, body_len);
    }
    inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); }
    inline bool Fill(const std::string &content) { return valid() && (meta()->capacity_ >= content.size()) && Pack(content); }
    inline bool Make(const void *src, const size_t size) { return Make(Alloc(size, src)); }
    inline bool Fill(const void *src, const size_t size) { return valid() && (meta()->capacity_ >= size) && Pack(src, size); }
    inline bool Make(const std::string &content) { return Make(content.data(), content.size()); }
    inline bool Fill(const std::string &content) { return Fill(content.data(), content.size()); }
    inline bool Make(const size_t size) { return Make(Alloc(size)); }
src/shm_socket.cpp
@@ -145,46 +145,66 @@
    return false;
}
bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb)
bool ShmSocket::Send(const MQInfo &remote, std::string &&content)
{
    size_t size = content.size();
    auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
    auto OnResult = [content = std::move(content), remote, this](MsgI &msg) mutable {
        if (!msg.Fill(content)) { return false; }
        try {
            if (!cb) {
                Send(remote, msg);
            } else {
                per_msg_cbs_->Store(msg_id, std::move(cb));
                auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
                    RecvCB cb_no_use;
                    per_msg_cbs_->Pick(msg_id, cb_no_use);
                };
                Send(remote, msg, onExpireRemoveCB);
            }
            SendImpl(remote, msg);
            return true;
        } catch (...) {
            SetLastError(eError, "Send internal error.");
            return false;
        }
    };
    return RequestAlloc(size, OnResult);
}
bool ShmSocket::Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb)
{
    try {
        per_msg_cbs_->Store(msg_id, std::move(cb));
        auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
            RecvCB cb_no_use;
            per_msg_cbs_->Pick(msg_id, cb_no_use);
        };
        SendImpl(remote, msg, onExpireRemoveCB);
        return true;
    } catch (std::exception &e) {
        SetLastError(eError, "Send internal error.");
        return false;
    }
}
bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb)
{
    size_t size = content.size();
    auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
        return msg.Fill(content) && Send(remote, msg, msg_id, std::move(cb));
    };
    return RequestAlloc(size, OnResult);
}
bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult)
{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
#if 0
    // self alloc
    MsgI msg(shm());
    if (msg.Make(size)) {
        DEFER1(msg.Release());
        return OnResult(msg);
        return onResult(msg);
    } else {
        return false;
    }
#else
    // center alloc
    return RequestAlloc(size, OnResult);
#endif
}
bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult)
{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
    // LOG_FUNCTION;
    if (node_proc_index_ == -1 || socket_index_ == -1) {
        LOG_ERROR() << "socket not inited.";
        return false;
    }
    int id = (++alloc_id_) & MaskBits(28);
src/shm_socket.h
@@ -68,16 +68,16 @@
    bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
    bool Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb);
    template <class Body>
    bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
    {
        return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb));
    }
    template <class... T>
    bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t)
    {
        return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
    }
    bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb) { return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); }
    bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb);
    template <class Body>
    bool Send(const MQInfo &remote, BHMsgHead &head, Body &body) { return Send(remote, MsgI::Serialize(head, body)); }
    bool Send(const MQInfo &remote, std::string &&content);
    bool Send(const MQInfo &remote, const MsgI &imsg) { return SendImpl(remote, imsg); }
    template <class... T>
    bool Send(const MQInfo &remote, const int64_t cmd, T &&...t)
    {
@@ -135,8 +135,6 @@
private:
    bool StopNoLock();
    bool RunningNoLock() { return !workers_.empty(); }
    bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB());
    template <class... Rest>
    bool SendImpl(Rest &&...rest)
utest/tcp_test.cpp
@@ -16,6 +16,8 @@
 * =====================================================================================
 */
#include "defs.h"
#include "node_center.h"
#include "tcp_connection.h"
#include "tcp_server.h"
#include "util.h"
@@ -31,18 +33,31 @@
BOOST_AUTO_TEST_CASE(TcpTest)
{
    const std::string bind_addr = "127.0.0.1";
    const std::string connect_addr = "127.0.0.1";
    const uint16_t port = 10000;
    SharedMemory &shm = TestShm();
    TcpServer server(port);
    server.Start();
    const std::string connect_addr = "127.0.0.1";
    const uint16_t port = kBHCenterPort;
    boost::asio::io_context io;
    tcp::endpoint dest(ip::address::from_string(connect_addr), port);
    for (int i = 0; i < 10; ++i) {
        TcpRequest1::Create(io, dest, "client->server " + std::to_string(i));
    MsgRequestTopic req;
    req.set_topic("#center_query_procs");
    req.set_data("");
    auto head = InitMsgHead(GetType(req), "#test_proc", 1000000);
    auto route = head.add_route();
    route->set_mq_id(12345);
    route->set_abs_addr(67890);
    head.mutable_dest()->set_ip(connect_addr);
    head.mutable_dest()->set_port(port);
    head.mutable_dest()->set_mq_id(1000011);
    head.mutable_dest()->set_abs_addr(10296);
    auto request(MsgI::Serialize(head, req));
    for (int i = 0; i < 1; ++i) {
        LOG_DEBUG() << "request size: " << request.size();
        TcpRequest1::Create(io, dest, request, DefaultSender(BHomeShm()));
    }
    io.run();