From 5d8aa35858eea622e0e8e4a1f111fd408c483a31 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 25 五月 2021 17:10:16 +0800
Subject: [PATCH] add some tcp code.

---
 box/tcp_connection.cpp |   94 +++++++++++
 utest/util.h           |    2 
 box/tcp_common.h       |   30 +++
 box/tcp_proxy.h        |   45 +++++
 box/tcp_server.h       |   40 +++++
 utest/tcp_test.cpp     |   51 ++++++
 box/tcp_proxy.cpp      |   79 +++++++++
 box/tcp_server.cpp     |   70 ++++++++
 box/tcp_connection.h   |   64 ++++++++
 9 files changed, 474 insertions(+), 1 deletions(-)

diff --git a/box/tcp_common.h b/box/tcp_common.h
new file mode 100644
index 0000000..8c8b7ec
--- /dev/null
+++ b/box/tcp_common.h
@@ -0,0 +1,30 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  tcp_common.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�05鏈�24鏃� 17鏃�24鍒�33绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef TCP_COMMON_8S8O7OV
+#define TCP_COMMON_8S8O7OV
+
+#include <boost/asio.hpp>
+#include <boost/uuid/string_generator.hpp>
+#include <boost/uuid/uuid.hpp>
+namespace ip = boost::asio::ip;
+using boost::asio::ip::tcp;
+typedef boost::system::error_code bserror_t;
+
+const boost::uuids::uuid kBHTcpServerTag = boost::uuids::string_generator()("e5bff527-6bf8-ee0d-cd28-b36594acee39");
+
+#endif // end of include guard: TCP_COMMON_8S8O7OV
diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
new file mode 100644
index 0000000..8968741
--- /dev/null
+++ b/box/tcp_connection.cpp
@@ -0,0 +1,94 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  tcp_connection.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�05鏈�25鏃� 15鏃�34鍒�03绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "tcp_connection.h"
+#include "log.h"
+
+namespace
+{
+template <class C>
+auto Buffer(C &c) { return boost::asio::buffer(c.data(), c.size()); }
+using boost::asio::async_read;
+using boost::asio::async_write;
+
+} // namespace
+
+TcpRequest1::TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) :
+    socket_(io), remote_(addr), request_(std::move(request)) {}
+void TcpRequest1::Connect()
+{
+	auto self = shared_from_this();
+	socket_.async_connect(remote_, [this, self](bserror_t ec) {
+		if (!ec) {
+			SendRequest();
+		} else {
+			LOG_ERROR() << "connect error " << ec;
+			Close();
+		}
+	});
+}
+void TcpRequest1::Close()
+{
+	socket_.close();
+}
+
+void TcpRequest1::SendRequest()
+{
+	LOG_INFO() << "client sending request " << request_;
+	auto self = shared_from_this();
+	async_write(socket_, Buffer(request_), [this, self](bserror_t ec, size_t) {
+		if (!ec) {
+			ReadReply();
+		} else {
+			Close();
+		}
+	});
+}
+void TcpRequest1::ReadReply()
+{
+	buffer_.resize(1000);
+	auto self = shared_from_this();
+	socket_.async_read_some(Buffer(buffer_), [this, self](bserror_t ec, size_t size) {
+		if (!ec) {
+			printf("reply data: %s\n", buffer_.data());
+		} else {
+			Close();
+		}
+	});
+}
+
+TcpReply1::TcpReply1(tcp::socket sock) :
+    socket_(std::move(sock)) {}
+
+void TcpReply1::Start()
+{
+	LOG_INFO() << "server session reading...";
+	recv_buffer_.resize(1000);
+	auto self(shared_from_this());
+	socket_.async_read_some(Buffer(recv_buffer_), [this, self](bserror_t ec, size_t size) {
+		LOG_INFO() << "server read : " << recv_buffer_.data();
+		// fake reply
+		if (!ec) {
+			send_buffer_ = std::string(recv_buffer_.data(), size) + " reply";
+			async_write(socket_, Buffer(send_buffer_), [this, self](bserror_t ec, size_t size) {
+				socket_.close();
+			});
+		} else {
+			socket_.close();
+		}
+	});
+}
\ No newline at end of file
diff --git a/box/tcp_connection.h b/box/tcp_connection.h
new file mode 100644
index 0000000..b3b8344
--- /dev/null
+++ b/box/tcp_connection.h
@@ -0,0 +1,64 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  tcp_connection.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�05鏈�25鏃� 15鏃�34鍒�12绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef TCP_CONNECTION_H373GIL5
+#define TCP_CONNECTION_H373GIL5
+
+#include "tcp_common.h"
+#include <functional>
+#include <memory>
+
+class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1>
+{
+public:
+	static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request)
+	{
+		std::make_shared<TcpRequest1>(io, addr, std::move(request))->Connect();
+	}
+
+	TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request);
+
+private:
+	void Connect();
+	void Close();
+	void SendRequest();
+	void ReadReply();
+
+	tcp::socket socket_;
+	tcp::endpoint remote_;
+	std::string request_;
+	std::vector<char> buffer_;
+};
+
+class TcpReply1 : public std::enable_shared_from_this<TcpReply1>
+{
+public:
+	static void Create(tcp::socket sock)
+	{
+		std::make_shared<TcpReply1>(std::move(sock))->Start();
+	}
+
+	TcpReply1(tcp::socket sock);
+	void Start();
+
+private:
+	tcp::socket socket_;
+	std::vector<char> recv_buffer_;
+	std::string send_buffer_;
+};
+
+#endif // end of include guard: TCP_CONNECTION_H373GIL5
diff --git a/box/tcp_proxy.cpp b/box/tcp_proxy.cpp
new file mode 100644
index 0000000..2e95a3b
--- /dev/null
+++ b/box/tcp_proxy.cpp
@@ -0,0 +1,79 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  tcp_proxy.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�05鏈�19鏃� 15鏃�04鍒�15绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "tcp_proxy.h"
+#include "defs.h"
+#include "shm_socket.h"
+#include "tcp_connection.h"
+
+TcpProxy::TcpProxy() :
+    run_(false) {}
+
+TcpProxy::~TcpProxy() {}
+
+bool TcpProxy::Start(bhome_shm::SharedMemory &shm)
+{
+	Stop();
+	bool cur = false;
+	if (!run_.compare_exchange_strong(cur, true)) { return false; }
+
+	auto &mq = GetCenterInfo(shm)->mq_tcp_proxy_;
+	local_.reset(new ShmSocket(mq.offset_, shm, mq.id_));
+	auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
+		auto &dest = head.dest();
+		if (dest.ip().empty() || dest.port() == 0) { return; }
+		bool r = Send(dest.ip(), dest.port(), msg.content());
+		// TODO check send fail.
+	};
+	local_->Start(1, localProc);
+
+	auto proxyProc = [this]() {
+		while (run_) {
+			io_context_.run_one_for(std::chrono::milliseconds(100));
+		}
+	};
+	return true;
+}
+
+void TcpProxy::Stop()
+{
+	local_.reset();
+
+	bool cur = true;
+	if (run_.compare_exchange_strong(cur, false)) {
+		if (worker_.joinable()) {
+			worker_.join();
+		}
+	}
+}
+
+bool TcpProxy::Send(const std::string &ip, int port, std::string &&content)
+{
+	if (content.empty()) { return false; }
+
+	tcp::endpoint dest(ip::address::from_string(ip), port);
+	TcpRequest1::Create(io_context_, dest, std::move(content));
+
+	// char tag[sizeof(kBHTcpServerTag)] = {0};
+
+	// int n = read(sock, tag, sizeof(tag));
+	// if (n == sizeof(tag) && memcmp(tag, &kBHTcpServerTag, sizeof(tag)) == 0) {
+	// 	send(sock, content.data(), content.size(), 0);
+	// 	connections_[addr].io_info_.h_ = [this, sock](int events) { OnReply(sock); };
+	// 	// success
+	// }
+}
diff --git a/box/tcp_proxy.h b/box/tcp_proxy.h
new file mode 100644
index 0000000..8f2af91
--- /dev/null
+++ b/box/tcp_proxy.h
@@ -0,0 +1,45 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  tcp_proxy.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�05鏈�19鏃� 15鏃�04鍒�55绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef TCP_PROXY_E1YJ92U5
+#define TCP_PROXY_E1YJ92U5
+
+#include "shm.h"
+#include "tcp_common.h"
+#include <atomic>
+#include <thread>
+
+class ShmSocket;
+
+class TcpProxy
+{
+public:
+	TcpProxy();
+	~TcpProxy();
+	bool Start(bhome_shm::SharedMemory &shm);
+	void Stop();
+
+private:
+	bool Send(const std::string &ip, int port, std::string &&content);
+	std::unique_ptr<ShmSocket> local_;
+
+	boost::asio::io_context io_context_;
+	std::thread worker_;
+	std::atomic<bool> run_;
+};
+
+#endif // end of include guard: TCP_PROXY_E1YJ92U5
diff --git a/box/tcp_server.cpp b/box/tcp_server.cpp
new file mode 100644
index 0000000..e4e229c
--- /dev/null
+++ b/box/tcp_server.cpp
@@ -0,0 +1,70 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  tcp_server.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�05鏈�19鏃� 15鏃�05鍒�33绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+
+#include "tcp_server.h"
+#include "log.h"
+#include "tcp_connection.h"
+#include <chrono>
+
+using namespace std::chrono_literals;
+
+TcpServer::TcpServer(int port) :
+    run_(false), listener_(io_, tcp::endpoint(tcp::v6(), port))
+{
+	Accept();
+}
+
+TcpServer::~TcpServer() { Stop(); }
+
+bool TcpServer::Start()
+{
+	Stop();
+	bool cur = false;
+	if (run_.compare_exchange_strong(cur, true)) {
+		auto proc = [this]() {
+			while (run_) {
+				io_.run_one_for(100ms);
+			}
+		};
+		std::thread(proc).swap(worker_);
+	}
+}
+void TcpServer::Stop()
+{
+	bool cur = true;
+	if (run_.compare_exchange_strong(cur, false)) {
+		io_.post([this]() {
+			listener_.close();
+		});
+		std::this_thread::sleep_for(1s);
+		if (worker_.joinable()) {
+			worker_.join();
+		}
+	}
+}
+
+void TcpServer::Accept()
+{
+	listener_.async_accept([this](bserror_t ec, tcp::socket sock) {
+		if (!ec) {
+			LOG_INFO() << "server accept client";
+			TcpReply1::Create(std::move(sock));
+		}
+		Accept();
+	});
+}
\ No newline at end of file
diff --git a/box/tcp_server.h b/box/tcp_server.h
new file mode 100644
index 0000000..c06cddc
--- /dev/null
+++ b/box/tcp_server.h
@@ -0,0 +1,40 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  tcp_server.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�05鏈�19鏃� 15鏃�06鍒�01绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef TCP_SERVER_795VXR94
+#define TCP_SERVER_795VXR94
+
+#include "tcp_common.h"
+#include <thread>
+
+class TcpServer
+{
+public:
+	explicit TcpServer(int port);
+	~TcpServer();
+	bool Start();
+	void Stop();
+
+private:
+	void Accept();
+	std::thread worker_;
+	std::atomic<bool> run_;
+	boost::asio::io_context io_;
+	tcp::acceptor listener_;
+};
+
+#endif // end of include guard: TCP_SERVER_795VXR94
diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp
new file mode 100644
index 0000000..ff31c3e
--- /dev/null
+++ b/utest/tcp_test.cpp
@@ -0,0 +1,51 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  tcp_test.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�05鏈�24鏃� 09鏃�40鍒�14绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+
+#include "tcp_connection.h"
+#include "tcp_server.h"
+#include "util.h"
+#include <sys/ioctl.h>
+
+//////////////////////
+
+template <class C, class V>
+void Erase(C &c, V &&v)
+{
+	c.erase(std::remove(c.begin(), c.end(), v), c.end());
+}
+
+BOOST_AUTO_TEST_CASE(TcpTest)
+{
+	const std::string bind_addr = "127.0.0.1";
+	const std::string connect_addr = "127.0.0.1";
+	const uint16_t port = 10000;
+
+	TcpServer server(port);
+	server.Start();
+
+	boost::asio::io_context io;
+
+	tcp::endpoint dest(ip::address::from_string(connect_addr), port);
+	for (int i = 0; i < 10; ++i) {
+		TcpRequest1::Create(io, dest, "client->server " + std::to_string(i));
+	}
+
+	io.run();
+
+	printf("TcpTest\n");
+}
diff --git a/utest/util.h b/utest/util.h
index 23463e2..53f747f 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -63,7 +63,7 @@
 public:
 	~ThreadManager() { WaitAll(); }
 	template <class T, class... P>
-	void Launch(T t, P... p) { threads_.emplace_back(t, p...); }
+	void Launch(T &&t, P &&...p) { threads_.emplace_back(std::forward<decltype(t)>(t), std::forward<decltype(p)>(p)...); }
 	void WaitAll()
 	{
 		for (auto &t : threads_) {

--
Gitblit v1.8.0