From 5d8aa35858eea622e0e8e4a1f111fd408c483a31 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 25 五月 2021 17:10:16 +0800
Subject: [PATCH] add some tcp code.
---
box/tcp_connection.cpp | 94 +++++++++++
utest/util.h | 2
box/tcp_common.h | 30 +++
box/tcp_proxy.h | 45 +++++
box/tcp_server.h | 40 +++++
utest/tcp_test.cpp | 51 ++++++
box/tcp_proxy.cpp | 79 +++++++++
box/tcp_server.cpp | 70 ++++++++
box/tcp_connection.h | 64 ++++++++
9 files changed, 474 insertions(+), 1 deletions(-)
diff --git a/box/tcp_common.h b/box/tcp_common.h
new file mode 100644
index 0000000..8c8b7ec
--- /dev/null
+++ b/box/tcp_common.h
@@ -0,0 +1,30 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: tcp_common.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�05鏈�24鏃� 17鏃�24鍒�33绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef TCP_COMMON_8S8O7OV
+#define TCP_COMMON_8S8O7OV
+
+#include <boost/asio.hpp>
+#include <boost/uuid/string_generator.hpp>
+#include <boost/uuid/uuid.hpp>
+namespace ip = boost::asio::ip;
+using boost::asio::ip::tcp;
+typedef boost::system::error_code bserror_t;
+
+const boost::uuids::uuid kBHTcpServerTag = boost::uuids::string_generator()("e5bff527-6bf8-ee0d-cd28-b36594acee39");
+
+#endif // end of include guard: TCP_COMMON_8S8O7OV
diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
new file mode 100644
index 0000000..8968741
--- /dev/null
+++ b/box/tcp_connection.cpp
@@ -0,0 +1,94 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: tcp_connection.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�05鏈�25鏃� 15鏃�34鍒�03绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "tcp_connection.h"
+#include "log.h"
+
+namespace
+{
+template <class C>
+auto Buffer(C &c) { return boost::asio::buffer(c.data(), c.size()); }
+using boost::asio::async_read;
+using boost::asio::async_write;
+
+} // namespace
+
+TcpRequest1::TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) :
+ socket_(io), remote_(addr), request_(std::move(request)) {}
+void TcpRequest1::Connect()
+{
+ auto self = shared_from_this();
+ socket_.async_connect(remote_, [this, self](bserror_t ec) {
+ if (!ec) {
+ SendRequest();
+ } else {
+ LOG_ERROR() << "connect error " << ec;
+ Close();
+ }
+ });
+}
+void TcpRequest1::Close()
+{
+ socket_.close();
+}
+
+void TcpRequest1::SendRequest()
+{
+ LOG_INFO() << "client sending request " << request_;
+ auto self = shared_from_this();
+ async_write(socket_, Buffer(request_), [this, self](bserror_t ec, size_t) {
+ if (!ec) {
+ ReadReply();
+ } else {
+ Close();
+ }
+ });
+}
+void TcpRequest1::ReadReply()
+{
+ buffer_.resize(1000);
+ auto self = shared_from_this();
+ socket_.async_read_some(Buffer(buffer_), [this, self](bserror_t ec, size_t size) {
+ if (!ec) {
+ printf("reply data: %s\n", buffer_.data());
+ } else {
+ Close();
+ }
+ });
+}
+
+TcpReply1::TcpReply1(tcp::socket sock) :
+ socket_(std::move(sock)) {}
+
+void TcpReply1::Start()
+{
+ LOG_INFO() << "server session reading...";
+ recv_buffer_.resize(1000);
+ auto self(shared_from_this());
+ socket_.async_read_some(Buffer(recv_buffer_), [this, self](bserror_t ec, size_t size) {
+ LOG_INFO() << "server read : " << recv_buffer_.data();
+ // fake reply
+ if (!ec) {
+ send_buffer_ = std::string(recv_buffer_.data(), size) + " reply";
+ async_write(socket_, Buffer(send_buffer_), [this, self](bserror_t ec, size_t size) {
+ socket_.close();
+ });
+ } else {
+ socket_.close();
+ }
+ });
+}
\ No newline at end of file
diff --git a/box/tcp_connection.h b/box/tcp_connection.h
new file mode 100644
index 0000000..b3b8344
--- /dev/null
+++ b/box/tcp_connection.h
@@ -0,0 +1,64 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: tcp_connection.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�05鏈�25鏃� 15鏃�34鍒�12绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef TCP_CONNECTION_H373GIL5
+#define TCP_CONNECTION_H373GIL5
+
+#include "tcp_common.h"
+#include <functional>
+#include <memory>
+
+class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1>
+{
+public:
+ static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request)
+ {
+ std::make_shared<TcpRequest1>(io, addr, std::move(request))->Connect();
+ }
+
+ TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request);
+
+private:
+ void Connect();
+ void Close();
+ void SendRequest();
+ void ReadReply();
+
+ tcp::socket socket_;
+ tcp::endpoint remote_;
+ std::string request_;
+ std::vector<char> buffer_;
+};
+
+class TcpReply1 : public std::enable_shared_from_this<TcpReply1>
+{
+public:
+ static void Create(tcp::socket sock)
+ {
+ std::make_shared<TcpReply1>(std::move(sock))->Start();
+ }
+
+ TcpReply1(tcp::socket sock);
+ void Start();
+
+private:
+ tcp::socket socket_;
+ std::vector<char> recv_buffer_;
+ std::string send_buffer_;
+};
+
+#endif // end of include guard: TCP_CONNECTION_H373GIL5
diff --git a/box/tcp_proxy.cpp b/box/tcp_proxy.cpp
new file mode 100644
index 0000000..2e95a3b
--- /dev/null
+++ b/box/tcp_proxy.cpp
@@ -0,0 +1,79 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: tcp_proxy.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�05鏈�19鏃� 15鏃�04鍒�15绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "tcp_proxy.h"
+#include "defs.h"
+#include "shm_socket.h"
+#include "tcp_connection.h"
+
+TcpProxy::TcpProxy() :
+ run_(false) {}
+
+TcpProxy::~TcpProxy() {}
+
+bool TcpProxy::Start(bhome_shm::SharedMemory &shm)
+{
+ Stop();
+ bool cur = false;
+ if (!run_.compare_exchange_strong(cur, true)) { return false; }
+
+ auto &mq = GetCenterInfo(shm)->mq_tcp_proxy_;
+ local_.reset(new ShmSocket(mq.offset_, shm, mq.id_));
+ auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
+ auto &dest = head.dest();
+ if (dest.ip().empty() || dest.port() == 0) { return; }
+ bool r = Send(dest.ip(), dest.port(), msg.content());
+ // TODO check send fail.
+ };
+ local_->Start(1, localProc);
+
+ auto proxyProc = [this]() {
+ while (run_) {
+ io_context_.run_one_for(std::chrono::milliseconds(100));
+ }
+ };
+ return true;
+}
+
+void TcpProxy::Stop()
+{
+ local_.reset();
+
+ bool cur = true;
+ if (run_.compare_exchange_strong(cur, false)) {
+ if (worker_.joinable()) {
+ worker_.join();
+ }
+ }
+}
+
+bool TcpProxy::Send(const std::string &ip, int port, std::string &&content)
+{
+ if (content.empty()) { return false; }
+
+ tcp::endpoint dest(ip::address::from_string(ip), port);
+ TcpRequest1::Create(io_context_, dest, std::move(content));
+
+ // char tag[sizeof(kBHTcpServerTag)] = {0};
+
+ // int n = read(sock, tag, sizeof(tag));
+ // if (n == sizeof(tag) && memcmp(tag, &kBHTcpServerTag, sizeof(tag)) == 0) {
+ // send(sock, content.data(), content.size(), 0);
+ // connections_[addr].io_info_.h_ = [this, sock](int events) { OnReply(sock); };
+ // // success
+ // }
+}
diff --git a/box/tcp_proxy.h b/box/tcp_proxy.h
new file mode 100644
index 0000000..8f2af91
--- /dev/null
+++ b/box/tcp_proxy.h
@@ -0,0 +1,45 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: tcp_proxy.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�05鏈�19鏃� 15鏃�04鍒�55绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef TCP_PROXY_E1YJ92U5
+#define TCP_PROXY_E1YJ92U5
+
+#include "shm.h"
+#include "tcp_common.h"
+#include <atomic>
+#include <thread>
+
+class ShmSocket;
+
+class TcpProxy
+{
+public:
+ TcpProxy();
+ ~TcpProxy();
+ bool Start(bhome_shm::SharedMemory &shm);
+ void Stop();
+
+private:
+ bool Send(const std::string &ip, int port, std::string &&content);
+ std::unique_ptr<ShmSocket> local_;
+
+ boost::asio::io_context io_context_;
+ std::thread worker_;
+ std::atomic<bool> run_;
+};
+
+#endif // end of include guard: TCP_PROXY_E1YJ92U5
diff --git a/box/tcp_server.cpp b/box/tcp_server.cpp
new file mode 100644
index 0000000..e4e229c
--- /dev/null
+++ b/box/tcp_server.cpp
@@ -0,0 +1,70 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: tcp_server.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�05鏈�19鏃� 15鏃�05鍒�33绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+
+#include "tcp_server.h"
+#include "log.h"
+#include "tcp_connection.h"
+#include <chrono>
+
+using namespace std::chrono_literals;
+
+TcpServer::TcpServer(int port) :
+ run_(false), listener_(io_, tcp::endpoint(tcp::v6(), port))
+{
+ Accept();
+}
+
+TcpServer::~TcpServer() { Stop(); }
+
+bool TcpServer::Start()
+{
+ Stop();
+ bool cur = false;
+ if (run_.compare_exchange_strong(cur, true)) {
+ auto proc = [this]() {
+ while (run_) {
+ io_.run_one_for(100ms);
+ }
+ };
+ std::thread(proc).swap(worker_);
+ }
+}
+void TcpServer::Stop()
+{
+ bool cur = true;
+ if (run_.compare_exchange_strong(cur, false)) {
+ io_.post([this]() {
+ listener_.close();
+ });
+ std::this_thread::sleep_for(1s);
+ if (worker_.joinable()) {
+ worker_.join();
+ }
+ }
+}
+
+void TcpServer::Accept()
+{
+ listener_.async_accept([this](bserror_t ec, tcp::socket sock) {
+ if (!ec) {
+ LOG_INFO() << "server accept client";
+ TcpReply1::Create(std::move(sock));
+ }
+ Accept();
+ });
+}
\ No newline at end of file
diff --git a/box/tcp_server.h b/box/tcp_server.h
new file mode 100644
index 0000000..c06cddc
--- /dev/null
+++ b/box/tcp_server.h
@@ -0,0 +1,40 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: tcp_server.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�05鏈�19鏃� 15鏃�06鍒�01绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef TCP_SERVER_795VXR94
+#define TCP_SERVER_795VXR94
+
+#include "tcp_common.h"
+#include <thread>
+
+class TcpServer
+{
+public:
+ explicit TcpServer(int port);
+ ~TcpServer();
+ bool Start();
+ void Stop();
+
+private:
+ void Accept();
+ std::thread worker_;
+ std::atomic<bool> run_;
+ boost::asio::io_context io_;
+ tcp::acceptor listener_;
+};
+
+#endif // end of include guard: TCP_SERVER_795VXR94
diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp
new file mode 100644
index 0000000..ff31c3e
--- /dev/null
+++ b/utest/tcp_test.cpp
@@ -0,0 +1,51 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: tcp_test.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�05鏈�24鏃� 09鏃�40鍒�14绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+
+#include "tcp_connection.h"
+#include "tcp_server.h"
+#include "util.h"
+#include <sys/ioctl.h>
+
+//////////////////////
+
+template <class C, class V>
+void Erase(C &c, V &&v)
+{
+ c.erase(std::remove(c.begin(), c.end(), v), c.end());
+}
+
+BOOST_AUTO_TEST_CASE(TcpTest)
+{
+ const std::string bind_addr = "127.0.0.1";
+ const std::string connect_addr = "127.0.0.1";
+ const uint16_t port = 10000;
+
+ TcpServer server(port);
+ server.Start();
+
+ boost::asio::io_context io;
+
+ tcp::endpoint dest(ip::address::from_string(connect_addr), port);
+ for (int i = 0; i < 10; ++i) {
+ TcpRequest1::Create(io, dest, "client->server " + std::to_string(i));
+ }
+
+ io.run();
+
+ printf("TcpTest\n");
+}
diff --git a/utest/util.h b/utest/util.h
index 23463e2..53f747f 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -63,7 +63,7 @@
public:
~ThreadManager() { WaitAll(); }
template <class T, class... P>
- void Launch(T t, P... p) { threads_.emplace_back(t, p...); }
+ void Launch(T &&t, P &&...p) { threads_.emplace_back(std::forward<decltype(t)>(t), std::forward<decltype(p)>(p)...); }
void WaitAll()
{
for (auto &t : threads_) {
--
Gitblit v1.8.0