From 5d8aa35858eea622e0e8e4a1f111fd408c483a31 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 25 五月 2021 17:10:16 +0800 Subject: [PATCH] add some tcp code. --- box/tcp_connection.cpp | 94 +++++++++++ utest/util.h | 2 box/tcp_common.h | 30 +++ box/tcp_proxy.h | 45 +++++ box/tcp_server.h | 40 +++++ utest/tcp_test.cpp | 51 ++++++ box/tcp_proxy.cpp | 79 +++++++++ box/tcp_server.cpp | 70 ++++++++ box/tcp_connection.h | 64 ++++++++ 9 files changed, 474 insertions(+), 1 deletions(-) diff --git a/box/tcp_common.h b/box/tcp_common.h new file mode 100644 index 0000000..8c8b7ec --- /dev/null +++ b/box/tcp_common.h @@ -0,0 +1,30 @@ +/* + * ===================================================================================== + * + * Filename: tcp_common.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�24鏃� 17鏃�24鍒�33绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef TCP_COMMON_8S8O7OV +#define TCP_COMMON_8S8O7OV + +#include <boost/asio.hpp> +#include <boost/uuid/string_generator.hpp> +#include <boost/uuid/uuid.hpp> +namespace ip = boost::asio::ip; +using boost::asio::ip::tcp; +typedef boost::system::error_code bserror_t; + +const boost::uuids::uuid kBHTcpServerTag = boost::uuids::string_generator()("e5bff527-6bf8-ee0d-cd28-b36594acee39"); + +#endif // end of include guard: TCP_COMMON_8S8O7OV diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp new file mode 100644 index 0000000..8968741 --- /dev/null +++ b/box/tcp_connection.cpp @@ -0,0 +1,94 @@ +/* + * ===================================================================================== + * + * Filename: tcp_connection.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�25鏃� 15鏃�34鍒�03绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#include "tcp_connection.h" +#include "log.h" + +namespace +{ +template <class C> +auto Buffer(C &c) { return boost::asio::buffer(c.data(), c.size()); } +using boost::asio::async_read; +using boost::asio::async_write; + +} // namespace + +TcpRequest1::TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) : + socket_(io), remote_(addr), request_(std::move(request)) {} +void TcpRequest1::Connect() +{ + auto self = shared_from_this(); + socket_.async_connect(remote_, [this, self](bserror_t ec) { + if (!ec) { + SendRequest(); + } else { + LOG_ERROR() << "connect error " << ec; + Close(); + } + }); +} +void TcpRequest1::Close() +{ + socket_.close(); +} + +void TcpRequest1::SendRequest() +{ + LOG_INFO() << "client sending request " << request_; + auto self = shared_from_this(); + async_write(socket_, Buffer(request_), [this, self](bserror_t ec, size_t) { + if (!ec) { + ReadReply(); + } else { + Close(); + } + }); +} +void TcpRequest1::ReadReply() +{ + buffer_.resize(1000); + auto self = shared_from_this(); + socket_.async_read_some(Buffer(buffer_), [this, self](bserror_t ec, size_t size) { + if (!ec) { + printf("reply data: %s\n", buffer_.data()); + } else { + Close(); + } + }); +} + +TcpReply1::TcpReply1(tcp::socket sock) : + socket_(std::move(sock)) {} + +void TcpReply1::Start() +{ + LOG_INFO() << "server session reading..."; + recv_buffer_.resize(1000); + auto self(shared_from_this()); + socket_.async_read_some(Buffer(recv_buffer_), [this, self](bserror_t ec, size_t size) { + LOG_INFO() << "server read : " << recv_buffer_.data(); + // fake reply + if (!ec) { + send_buffer_ = std::string(recv_buffer_.data(), size) + " reply"; + async_write(socket_, Buffer(send_buffer_), [this, self](bserror_t ec, size_t size) { + socket_.close(); + }); + } else { + socket_.close(); + } + }); +} \ No newline at end of file diff --git a/box/tcp_connection.h b/box/tcp_connection.h new file mode 100644 index 0000000..b3b8344 --- /dev/null +++ b/box/tcp_connection.h @@ -0,0 +1,64 @@ +/* + * ===================================================================================== + * + * Filename: tcp_connection.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�25鏃� 15鏃�34鍒�12绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef TCP_CONNECTION_H373GIL5 +#define TCP_CONNECTION_H373GIL5 + +#include "tcp_common.h" +#include <functional> +#include <memory> + +class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1> +{ +public: + static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) + { + std::make_shared<TcpRequest1>(io, addr, std::move(request))->Connect(); + } + + TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request); + +private: + void Connect(); + void Close(); + void SendRequest(); + void ReadReply(); + + tcp::socket socket_; + tcp::endpoint remote_; + std::string request_; + std::vector<char> buffer_; +}; + +class TcpReply1 : public std::enable_shared_from_this<TcpReply1> +{ +public: + static void Create(tcp::socket sock) + { + std::make_shared<TcpReply1>(std::move(sock))->Start(); + } + + TcpReply1(tcp::socket sock); + void Start(); + +private: + tcp::socket socket_; + std::vector<char> recv_buffer_; + std::string send_buffer_; +}; + +#endif // end of include guard: TCP_CONNECTION_H373GIL5 diff --git a/box/tcp_proxy.cpp b/box/tcp_proxy.cpp new file mode 100644 index 0000000..2e95a3b --- /dev/null +++ b/box/tcp_proxy.cpp @@ -0,0 +1,79 @@ +/* + * ===================================================================================== + * + * Filename: tcp_proxy.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�19鏃� 15鏃�04鍒�15绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#include "tcp_proxy.h" +#include "defs.h" +#include "shm_socket.h" +#include "tcp_connection.h" + +TcpProxy::TcpProxy() : + run_(false) {} + +TcpProxy::~TcpProxy() {} + +bool TcpProxy::Start(bhome_shm::SharedMemory &shm) +{ + Stop(); + bool cur = false; + if (!run_.compare_exchange_strong(cur, true)) { return false; } + + auto &mq = GetCenterInfo(shm)->mq_tcp_proxy_; + local_.reset(new ShmSocket(mq.offset_, shm, mq.id_)); + auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { + auto &dest = head.dest(); + if (dest.ip().empty() || dest.port() == 0) { return; } + bool r = Send(dest.ip(), dest.port(), msg.content()); + // TODO check send fail. + }; + local_->Start(1, localProc); + + auto proxyProc = [this]() { + while (run_) { + io_context_.run_one_for(std::chrono::milliseconds(100)); + } + }; + return true; +} + +void TcpProxy::Stop() +{ + local_.reset(); + + bool cur = true; + if (run_.compare_exchange_strong(cur, false)) { + if (worker_.joinable()) { + worker_.join(); + } + } +} + +bool TcpProxy::Send(const std::string &ip, int port, std::string &&content) +{ + if (content.empty()) { return false; } + + tcp::endpoint dest(ip::address::from_string(ip), port); + TcpRequest1::Create(io_context_, dest, std::move(content)); + + // char tag[sizeof(kBHTcpServerTag)] = {0}; + + // int n = read(sock, tag, sizeof(tag)); + // if (n == sizeof(tag) && memcmp(tag, &kBHTcpServerTag, sizeof(tag)) == 0) { + // send(sock, content.data(), content.size(), 0); + // connections_[addr].io_info_.h_ = [this, sock](int events) { OnReply(sock); }; + // // success + // } +} diff --git a/box/tcp_proxy.h b/box/tcp_proxy.h new file mode 100644 index 0000000..8f2af91 --- /dev/null +++ b/box/tcp_proxy.h @@ -0,0 +1,45 @@ +/* + * ===================================================================================== + * + * Filename: tcp_proxy.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�19鏃� 15鏃�04鍒�55绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef TCP_PROXY_E1YJ92U5 +#define TCP_PROXY_E1YJ92U5 + +#include "shm.h" +#include "tcp_common.h" +#include <atomic> +#include <thread> + +class ShmSocket; + +class TcpProxy +{ +public: + TcpProxy(); + ~TcpProxy(); + bool Start(bhome_shm::SharedMemory &shm); + void Stop(); + +private: + bool Send(const std::string &ip, int port, std::string &&content); + std::unique_ptr<ShmSocket> local_; + + boost::asio::io_context io_context_; + std::thread worker_; + std::atomic<bool> run_; +}; + +#endif // end of include guard: TCP_PROXY_E1YJ92U5 diff --git a/box/tcp_server.cpp b/box/tcp_server.cpp new file mode 100644 index 0000000..e4e229c --- /dev/null +++ b/box/tcp_server.cpp @@ -0,0 +1,70 @@ +/* + * ===================================================================================== + * + * Filename: tcp_server.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�19鏃� 15鏃�05鍒�33绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ + +#include "tcp_server.h" +#include "log.h" +#include "tcp_connection.h" +#include <chrono> + +using namespace std::chrono_literals; + +TcpServer::TcpServer(int port) : + run_(false), listener_(io_, tcp::endpoint(tcp::v6(), port)) +{ + Accept(); +} + +TcpServer::~TcpServer() { Stop(); } + +bool TcpServer::Start() +{ + Stop(); + bool cur = false; + if (run_.compare_exchange_strong(cur, true)) { + auto proc = [this]() { + while (run_) { + io_.run_one_for(100ms); + } + }; + std::thread(proc).swap(worker_); + } +} +void TcpServer::Stop() +{ + bool cur = true; + if (run_.compare_exchange_strong(cur, false)) { + io_.post([this]() { + listener_.close(); + }); + std::this_thread::sleep_for(1s); + if (worker_.joinable()) { + worker_.join(); + } + } +} + +void TcpServer::Accept() +{ + listener_.async_accept([this](bserror_t ec, tcp::socket sock) { + if (!ec) { + LOG_INFO() << "server accept client"; + TcpReply1::Create(std::move(sock)); + } + Accept(); + }); +} \ No newline at end of file diff --git a/box/tcp_server.h b/box/tcp_server.h new file mode 100644 index 0000000..c06cddc --- /dev/null +++ b/box/tcp_server.h @@ -0,0 +1,40 @@ +/* + * ===================================================================================== + * + * Filename: tcp_server.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�19鏃� 15鏃�06鍒�01绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef TCP_SERVER_795VXR94 +#define TCP_SERVER_795VXR94 + +#include "tcp_common.h" +#include <thread> + +class TcpServer +{ +public: + explicit TcpServer(int port); + ~TcpServer(); + bool Start(); + void Stop(); + +private: + void Accept(); + std::thread worker_; + std::atomic<bool> run_; + boost::asio::io_context io_; + tcp::acceptor listener_; +}; + +#endif // end of include guard: TCP_SERVER_795VXR94 diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp new file mode 100644 index 0000000..ff31c3e --- /dev/null +++ b/utest/tcp_test.cpp @@ -0,0 +1,51 @@ +/* + * ===================================================================================== + * + * Filename: tcp_test.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�24鏃� 09鏃�40鍒�14绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ + +#include "tcp_connection.h" +#include "tcp_server.h" +#include "util.h" +#include <sys/ioctl.h> + +////////////////////// + +template <class C, class V> +void Erase(C &c, V &&v) +{ + c.erase(std::remove(c.begin(), c.end(), v), c.end()); +} + +BOOST_AUTO_TEST_CASE(TcpTest) +{ + const std::string bind_addr = "127.0.0.1"; + const std::string connect_addr = "127.0.0.1"; + const uint16_t port = 10000; + + TcpServer server(port); + server.Start(); + + boost::asio::io_context io; + + tcp::endpoint dest(ip::address::from_string(connect_addr), port); + for (int i = 0; i < 10; ++i) { + TcpRequest1::Create(io, dest, "client->server " + std::to_string(i)); + } + + io.run(); + + printf("TcpTest\n"); +} diff --git a/utest/util.h b/utest/util.h index 23463e2..53f747f 100644 --- a/utest/util.h +++ b/utest/util.h @@ -63,7 +63,7 @@ public: ~ThreadManager() { WaitAll(); } template <class T, class... P> - void Launch(T t, P... p) { threads_.emplace_back(t, p...); } + void Launch(T &&t, P &&...p) { threads_.emplace_back(std::forward<decltype(t)>(t), std::forward<decltype(p)>(p)...); } void WaitAll() { for (auto &t : threads_) { -- Gitblit v1.8.0