lichao
2021-05-25 5d8aa35858eea622e0e8e4a1f111fd408c483a31
add some tcp code.
8个文件已添加
1个文件已修改
475 ■■■■■ 已修改文件
box/tcp_common.h 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_connection.cpp 94 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_connection.h 64 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_proxy.cpp 79 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_proxy.h 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_server.cpp 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_server.h 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/tcp_test.cpp 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/util.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_common.h
New file
@@ -0,0 +1,30 @@
/*
 * =====================================================================================
 *
 *       Filename:  tcp_common.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年05月24日 17时24分33秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef TCP_COMMON_8S8O7OV
#define TCP_COMMON_8S8O7OV
#include <boost/asio.hpp>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
namespace ip = boost::asio::ip;
using boost::asio::ip::tcp;
typedef boost::system::error_code bserror_t;
const boost::uuids::uuid kBHTcpServerTag = boost::uuids::string_generator()("e5bff527-6bf8-ee0d-cd28-b36594acee39");
#endif // end of include guard: TCP_COMMON_8S8O7OV
box/tcp_connection.cpp
New file
@@ -0,0 +1,94 @@
/*
 * =====================================================================================
 *
 *       Filename:  tcp_connection.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年05月25日 15时34分03秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "tcp_connection.h"
#include "log.h"
namespace
{
template <class C>
auto Buffer(C &c) { return boost::asio::buffer(c.data(), c.size()); }
using boost::asio::async_read;
using boost::asio::async_write;
} // namespace
TcpRequest1::TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) :
    socket_(io), remote_(addr), request_(std::move(request)) {}
void TcpRequest1::Connect()
{
    auto self = shared_from_this();
    socket_.async_connect(remote_, [this, self](bserror_t ec) {
        if (!ec) {
            SendRequest();
        } else {
            LOG_ERROR() << "connect error " << ec;
            Close();
        }
    });
}
void TcpRequest1::Close()
{
    socket_.close();
}
void TcpRequest1::SendRequest()
{
    LOG_INFO() << "client sending request " << request_;
    auto self = shared_from_this();
    async_write(socket_, Buffer(request_), [this, self](bserror_t ec, size_t) {
        if (!ec) {
            ReadReply();
        } else {
            Close();
        }
    });
}
void TcpRequest1::ReadReply()
{
    buffer_.resize(1000);
    auto self = shared_from_this();
    socket_.async_read_some(Buffer(buffer_), [this, self](bserror_t ec, size_t size) {
        if (!ec) {
            printf("reply data: %s\n", buffer_.data());
        } else {
            Close();
        }
    });
}
TcpReply1::TcpReply1(tcp::socket sock) :
    socket_(std::move(sock)) {}
void TcpReply1::Start()
{
    LOG_INFO() << "server session reading...";
    recv_buffer_.resize(1000);
    auto self(shared_from_this());
    socket_.async_read_some(Buffer(recv_buffer_), [this, self](bserror_t ec, size_t size) {
        LOG_INFO() << "server read : " << recv_buffer_.data();
        // fake reply
        if (!ec) {
            send_buffer_ = std::string(recv_buffer_.data(), size) + " reply";
            async_write(socket_, Buffer(send_buffer_), [this, self](bserror_t ec, size_t size) {
                socket_.close();
            });
        } else {
            socket_.close();
        }
    });
}
box/tcp_connection.h
New file
@@ -0,0 +1,64 @@
/*
 * =====================================================================================
 *
 *       Filename:  tcp_connection.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年05月25日 15时34分12秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef TCP_CONNECTION_H373GIL5
#define TCP_CONNECTION_H373GIL5
#include "tcp_common.h"
#include <functional>
#include <memory>
class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1>
{
public:
    static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request)
    {
        std::make_shared<TcpRequest1>(io, addr, std::move(request))->Connect();
    }
    TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request);
private:
    void Connect();
    void Close();
    void SendRequest();
    void ReadReply();
    tcp::socket socket_;
    tcp::endpoint remote_;
    std::string request_;
    std::vector<char> buffer_;
};
class TcpReply1 : public std::enable_shared_from_this<TcpReply1>
{
public:
    static void Create(tcp::socket sock)
    {
        std::make_shared<TcpReply1>(std::move(sock))->Start();
    }
    TcpReply1(tcp::socket sock);
    void Start();
private:
    tcp::socket socket_;
    std::vector<char> recv_buffer_;
    std::string send_buffer_;
};
#endif // end of include guard: TCP_CONNECTION_H373GIL5
box/tcp_proxy.cpp
New file
@@ -0,0 +1,79 @@
/*
 * =====================================================================================
 *
 *       Filename:  tcp_proxy.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年05月19日 15时04分15秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "tcp_proxy.h"
#include "defs.h"
#include "shm_socket.h"
#include "tcp_connection.h"
TcpProxy::TcpProxy() :
    run_(false) {}
TcpProxy::~TcpProxy() {}
bool TcpProxy::Start(bhome_shm::SharedMemory &shm)
{
    Stop();
    bool cur = false;
    if (!run_.compare_exchange_strong(cur, true)) { return false; }
    auto &mq = GetCenterInfo(shm)->mq_tcp_proxy_;
    local_.reset(new ShmSocket(mq.offset_, shm, mq.id_));
    auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
        auto &dest = head.dest();
        if (dest.ip().empty() || dest.port() == 0) { return; }
        bool r = Send(dest.ip(), dest.port(), msg.content());
        // TODO check send fail.
    };
    local_->Start(1, localProc);
    auto proxyProc = [this]() {
        while (run_) {
            io_context_.run_one_for(std::chrono::milliseconds(100));
        }
    };
    return true;
}
void TcpProxy::Stop()
{
    local_.reset();
    bool cur = true;
    if (run_.compare_exchange_strong(cur, false)) {
        if (worker_.joinable()) {
            worker_.join();
        }
    }
}
bool TcpProxy::Send(const std::string &ip, int port, std::string &&content)
{
    if (content.empty()) { return false; }
    tcp::endpoint dest(ip::address::from_string(ip), port);
    TcpRequest1::Create(io_context_, dest, std::move(content));
    // char tag[sizeof(kBHTcpServerTag)] = {0};
    // int n = read(sock, tag, sizeof(tag));
    // if (n == sizeof(tag) && memcmp(tag, &kBHTcpServerTag, sizeof(tag)) == 0) {
    //     send(sock, content.data(), content.size(), 0);
    //     connections_[addr].io_info_.h_ = [this, sock](int events) { OnReply(sock); };
    //     // success
    // }
}
box/tcp_proxy.h
New file
@@ -0,0 +1,45 @@
/*
 * =====================================================================================
 *
 *       Filename:  tcp_proxy.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年05月19日 15时04分55秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef TCP_PROXY_E1YJ92U5
#define TCP_PROXY_E1YJ92U5
#include "shm.h"
#include "tcp_common.h"
#include <atomic>
#include <thread>
class ShmSocket;
class TcpProxy
{
public:
    TcpProxy();
    ~TcpProxy();
    bool Start(bhome_shm::SharedMemory &shm);
    void Stop();
private:
    bool Send(const std::string &ip, int port, std::string &&content);
    std::unique_ptr<ShmSocket> local_;
    boost::asio::io_context io_context_;
    std::thread worker_;
    std::atomic<bool> run_;
};
#endif // end of include guard: TCP_PROXY_E1YJ92U5
box/tcp_server.cpp
New file
@@ -0,0 +1,70 @@
/*
 * =====================================================================================
 *
 *       Filename:  tcp_server.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年05月19日 15时05分33秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "tcp_server.h"
#include "log.h"
#include "tcp_connection.h"
#include <chrono>
using namespace std::chrono_literals;
TcpServer::TcpServer(int port) :
    run_(false), listener_(io_, tcp::endpoint(tcp::v6(), port))
{
    Accept();
}
TcpServer::~TcpServer() { Stop(); }
bool TcpServer::Start()
{
    Stop();
    bool cur = false;
    if (run_.compare_exchange_strong(cur, true)) {
        auto proc = [this]() {
            while (run_) {
                io_.run_one_for(100ms);
            }
        };
        std::thread(proc).swap(worker_);
    }
}
void TcpServer::Stop()
{
    bool cur = true;
    if (run_.compare_exchange_strong(cur, false)) {
        io_.post([this]() {
            listener_.close();
        });
        std::this_thread::sleep_for(1s);
        if (worker_.joinable()) {
            worker_.join();
        }
    }
}
void TcpServer::Accept()
{
    listener_.async_accept([this](bserror_t ec, tcp::socket sock) {
        if (!ec) {
            LOG_INFO() << "server accept client";
            TcpReply1::Create(std::move(sock));
        }
        Accept();
    });
}
box/tcp_server.h
New file
@@ -0,0 +1,40 @@
/*
 * =====================================================================================
 *
 *       Filename:  tcp_server.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年05月19日 15时06分01秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef TCP_SERVER_795VXR94
#define TCP_SERVER_795VXR94
#include "tcp_common.h"
#include <thread>
class TcpServer
{
public:
    explicit TcpServer(int port);
    ~TcpServer();
    bool Start();
    void Stop();
private:
    void Accept();
    std::thread worker_;
    std::atomic<bool> run_;
    boost::asio::io_context io_;
    tcp::acceptor listener_;
};
#endif // end of include guard: TCP_SERVER_795VXR94
utest/tcp_test.cpp
New file
@@ -0,0 +1,51 @@
/*
 * =====================================================================================
 *
 *       Filename:  tcp_test.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年05月24日 09时40分14秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "tcp_connection.h"
#include "tcp_server.h"
#include "util.h"
#include <sys/ioctl.h>
//////////////////////
template <class C, class V>
void Erase(C &c, V &&v)
{
    c.erase(std::remove(c.begin(), c.end(), v), c.end());
}
BOOST_AUTO_TEST_CASE(TcpTest)
{
    const std::string bind_addr = "127.0.0.1";
    const std::string connect_addr = "127.0.0.1";
    const uint16_t port = 10000;
    TcpServer server(port);
    server.Start();
    boost::asio::io_context io;
    tcp::endpoint dest(ip::address::from_string(connect_addr), port);
    for (int i = 0; i < 10; ++i) {
        TcpRequest1::Create(io, dest, "client->server " + std::to_string(i));
    }
    io.run();
    printf("TcpTest\n");
}
utest/util.h
@@ -63,7 +63,7 @@
public:
    ~ThreadManager() { WaitAll(); }
    template <class T, class... P>
    void Launch(T t, P... p) { threads_.emplace_back(t, p...); }
    void Launch(T &&t, P &&...p) { threads_.emplace_back(std::forward<decltype(t)>(t), std::forward<decltype(p)>(p)...); }
    void WaitAll()
    {
        for (auto &t : threads_) {