From 026bbfaf2b5d73a26b8e2fa49158883ef64c211b Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 27 五月 2021 13:51:26 +0800
Subject: [PATCH] tcp server call center to send proxy requests.
---
box/tcp_connection.cpp | 228 +++++++++++++++----
box/center.cpp | 4
src/msg.h | 47 +++
box/io_service.h | 42 +++
box/tcp_proxy.cpp | 20 -
box/center.h | 2
box/node_center.cpp | 23 ++
box/io_service.cpp | 48 ++++
src/shm_socket.cpp | 58 +++-
src/shm_socket.h | 20 -
box/tcp_proxy.h | 2
box/tcp_server.h | 16
utest/tcp_test.cpp | 29 ++
box/node_center.h | 1
box/tcp_server.cpp | 32 --
box/tcp_connection.h | 37 ++-
16 files changed, 449 insertions(+), 160 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index c3a03e3..8d24315 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -18,6 +18,7 @@
#include "center.h"
#include "center_topic_node.h"
#include "node_center.h"
+#include "tcp_server.h"
#include <chrono>
using namespace std::chrono;
@@ -175,6 +176,7 @@
}
topic_node_.reset(new CenterTopicNode(center_ptr, shm));
+ tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr));
}
BHCenter::~BHCenter() { Stop(); }
@@ -186,11 +188,13 @@
sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
}
topic_node_->Start();
+ tcp_server_->Start();
return true;
}
bool BHCenter::Stop()
{
+ tcp_server_->Stop();
topic_node_->Stop();
for (auto &kv : sockets_) {
kv.second->Stop();
diff --git a/box/center.h b/box/center.h
index 6610277..c4aa1ac 100644
--- a/box/center.h
+++ b/box/center.h
@@ -23,6 +23,7 @@
#include <map>
#include <memory>
class CenterTopicNode;
+class TcpServer;
class BHCenter
{
@@ -53,6 +54,7 @@
std::map<std::string, std::shared_ptr<ShmSocket>> sockets_;
std::unique_ptr<CenterTopicNode> topic_node_;
+ std::unique_ptr<TcpServer> tcp_server_;
};
#endif // end of include guard: CENTER_TM9OUQTG
diff --git a/box/io_service.cpp b/box/io_service.cpp
new file mode 100644
index 0000000..1d531e0
--- /dev/null
+++ b/box/io_service.cpp
@@ -0,0 +1,48 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: io_service.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�05鏈�27鏃� 13鏃�25鍒�18绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "io_service.h"
+#include <chrono>
+using namespace std::chrono_literals;
+
+bool IoService::Start()
+{
+ Stop();
+ bool cur = false;
+ if (!run_.compare_exchange_strong(cur, true)) {
+ return false;
+ }
+
+ auto proc = [this]() {
+ while (run_) {
+ io_.run_one_for(100ms);
+ }
+ OnStop();
+ };
+ std::thread(proc).swap(worker_);
+ return true;
+}
+
+void IoService::Stop()
+{
+ bool cur = true;
+ if (run_.compare_exchange_strong(cur, false)) {
+ if (worker_.joinable()) {
+ worker_.join();
+ }
+ }
+}
diff --git a/box/io_service.h b/box/io_service.h
new file mode 100644
index 0000000..000facc
--- /dev/null
+++ b/box/io_service.h
@@ -0,0 +1,42 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: io_service.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�05鏈�27鏃� 13鏃�25鍒�37绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef IO_SERVICE_ODKKJG3D
+#define IO_SERVICE_ODKKJG3D
+
+#include <boost/asio.hpp>
+#include <thread>
+
+class IoService
+{
+public:
+ IoService() :
+ run_(false) {}
+ bool Start();
+ void Stop();
+
+ typedef boost::asio::io_context io_service_t;
+ io_service_t &io() { return io_; }
+
+private:
+ virtual void OnStop() {}
+ io_service_t io_;
+ std::thread worker_;
+ std::atomic<bool> run_;
+};
+
+#endif // end of include guard: IO_SERVICE_ODKKJG3D
diff --git a/box/node_center.cpp b/box/node_center.cpp
index cbaef0e..ff199b2 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -209,6 +209,29 @@
RecordMsg(msg);
return socket.Send(dest, msg);
}
+bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+{
+ auto ssn = dest.id_ - (dest.id_ % 10);
+ LOG_DEBUG() << "prox ssn " << ssn;
+ auto pos = nodes_.find(ssn);
+ if (pos == nodes_.end()) {
+ LOG_ERROR() << "proxy msg, ssn not found.";
+ return false;
+ }
+ auto &node = pos->second;
+ if (!Valid(*node)) { return false; }
+
+ ShmSocket &sender(DefaultSender(node->shm_));
+ auto route = head.add_route();
+ route->set_mq_id(sender.id());
+ route->set_abs_addr(sender.AbsAddr());
+
+ ShmMsg msg(node->shm_);
+ if (!msg.Make(head, body_content)) { return false; }
+ DEFER1(msg.Release(););
+ RecordMsg(msg);
+ return sender.Send(dest, msg, head.msg_id(), std::move(cb));
+}
void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
{
diff --git a/box/node_center.h b/box/node_center.h
index caaf054..1c79809 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -122,6 +122,7 @@
void RecordMsg(const MsgI &msg);
bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
+ bool ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
void OnAlloc(ShmSocket &socket, const int64_t val);
void OnFree(ShmSocket &socket, const int64_t val);
bool OnCommand(ShmSocket &socket, const int64_t val);
diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
index 8968741..22162e5 100644
--- a/box/tcp_connection.cpp
+++ b/box/tcp_connection.cpp
@@ -17,78 +17,198 @@
*/
#include "tcp_connection.h"
#include "log.h"
+#include "msg.h"
+#include "node_center.h"
+#include "shm_socket.h"
namespace
{
template <class C>
-auto Buffer(C &c) { return boost::asio::buffer(c.data(), c.size()); }
+auto Buffer(C &c, size_t offset = 0) { return boost::asio::buffer(c.data() + offset, c.size() - offset); }
using boost::asio::async_read;
using boost::asio::async_write;
+typedef std::function<void()> VoidHandler;
+typedef std::function<void(size_t)> SizeHandler;
+
+template <class T, class... Param>
+auto TcpCallback(T &conn, std::function<void(Param...)> const &func)
+{
+ auto self(conn.shared_from_this());
+ return [self, func](bserror_t ec, Param... size) {
+ if (!ec) {
+ func(size...);
+ } else {
+ self->OnError(ec);
+ }
+ };
+}
+
+template <class T>
+auto TcpCB(T &conn, VoidHandler const &func) { return TcpCallback(conn, func); }
+
+template <class T>
+auto TcpCBSize(T &conn, SizeHandler const &func) { return TcpCallback(conn, func); }
+
+template <class T>
+auto TcpCBSize(T &conn, VoidHandler const &func)
+{
+ return TcpCBSize(conn, [func](size_t) { func(); });
+}
+
+bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content)
+{
+ const char *p = buffer.data();
+ LOG_DEBUG() << "msg len " << len;
+ if (4 > len) { return false; }
+ uint32_t head_len = Get32(p);
+ LOG_DEBUG() << "head_len " << head_len;
+ if (head_len > 1024 * 4) {
+ throw std::runtime_error("unexpected tcp reply data.");
+ }
+ auto before_body = 4 + head_len + 4;
+ if (before_body > len) {
+ if (before_body > buffer.size()) {
+ buffer.resize(before_body);
+ }
+ return false;
+ }
+ if (!head.ParseFromArray(p + 4, head_len)) {
+ throw std::runtime_error("tcp recv invalid reply head.");
+ }
+ uint32_t body_len = Get32(p + 4 + head_len);
+ buffer.resize(before_body + body_len);
+ if (buffer.size() > len) { return false; }
+ body_content.assign(p + before_body, body_len);
+ return true;
+}
+
} // namespace
-TcpRequest1::TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) :
- socket_(io), remote_(addr), request_(std::move(request)) {}
-void TcpRequest1::Connect()
+/// request -----------------------------------------------------------
+
+void TcpRequest1::OnError(bserror_t ec)
{
- auto self = shared_from_this();
- socket_.async_connect(remote_, [this, self](bserror_t ec) {
- if (!ec) {
- SendRequest();
- } else {
- LOG_ERROR() << "connect error " << ec;
- Close();
- }
- });
+ LOG_ERROR() << "tcp client error: " << ec;
+ Close();
+}
+
+void TcpRequest1::Start()
+{
+ auto readReply = [this]() {
+ recv_buffer_.resize(1000);
+ recv_len_ = 0;
+ socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+ };
+ auto request = [this, readReply]() { async_write(socket_, Buffer(request_), TcpCBSize(*this, readReply)); };
+
+ socket_.async_connect(remote_, TcpCB(*this, request));
}
void TcpRequest1::Close()
{
+ LOG_DEBUG() << "client close";
+ socket_.close();
+}
+void TcpRequest1::OnRead(size_t size)
+{
+ LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_;
+
+ recv_len_ += size;
+ BHMsgHead head;
+ std::string body_content;
+ bool recv_done = false;
+ try {
+ recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
+ } catch (std::exception &e) {
+ LOG_ERROR() << e.what();
+ Close();
+ return;
+ }
+
+ if (recv_done) {
+ // just pass to client, no check, client will check it anyway.
+ LOG_DEBUG() << "route size: " << head.route_size();
+ if (head.route_size() < 1) { return; }
+ auto &back = head.route(head.route_size() - 1);
+ MQInfo dest = {back.mq_id(), back.abs_addr()};
+ head.mutable_route()->RemoveLast();
+
+ LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_;
+ MsgRequestTopicReply reply;
+ if (reply.ParseFromString(body_content)) {
+ LOG_DEBUG() << "err msg: " << reply.errmsg().errstring();
+ LOG_DEBUG() << "content : " << reply.data();
+ }
+ Close();
+ return;
+ shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size()));
+ } else { // read again
+ LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
+ socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+ }
+}
+
+/// reply --------------------------------------------------------------
+
+void TcpReply1::OnError(bserror_t ec) { Close(); }
+void TcpReply1::Close()
+{
+ LOG_DEBUG() << "server close.";
socket_.close();
}
-void TcpRequest1::SendRequest()
-{
- LOG_INFO() << "client sending request " << request_;
- auto self = shared_from_this();
- async_write(socket_, Buffer(request_), [this, self](bserror_t ec, size_t) {
- if (!ec) {
- ReadReply();
- } else {
- Close();
- }
- });
-}
-void TcpRequest1::ReadReply()
-{
- buffer_.resize(1000);
- auto self = shared_from_this();
- socket_.async_read_some(Buffer(buffer_), [this, self](bserror_t ec, size_t size) {
- if (!ec) {
- printf("reply data: %s\n", buffer_.data());
- } else {
- Close();
- }
- });
-}
-
-TcpReply1::TcpReply1(tcp::socket sock) :
- socket_(std::move(sock)) {}
-
void TcpReply1::Start()
{
- LOG_INFO() << "server session reading...";
recv_buffer_.resize(1000);
- auto self(shared_from_this());
- socket_.async_read_some(Buffer(recv_buffer_), [this, self](bserror_t ec, size_t size) {
- LOG_INFO() << "server read : " << recv_buffer_.data();
- // fake reply
- if (!ec) {
- send_buffer_ = std::string(recv_buffer_.data(), size) + " reply";
- async_write(socket_, Buffer(send_buffer_), [this, self](bserror_t ec, size_t size) {
- socket_.close();
- });
+ socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+}
+
+void TcpReply1::OnRead(size_t size)
+{
+ recv_len_ += size;
+ BHMsgHead head;
+ std::string body_content;
+ bool recv_done = false;
+ try {
+ recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
+ } catch (std::exception &e) {
+ LOG_ERROR() << e.what();
+ Close();
+ return;
+ }
+
+ auto ParseBody = [&](auto &req) {
+ const char *p = recv_buffer_.data();
+ uint32_t size = Get32(p);
+ p += 4;
+ p += size;
+ size = Get32(p);
+ p += 4;
+ return req.ParseFromArray(p, size);
+ };
+
+ if (recv_done) {
+ LOG_DEBUG() << "request data: " << size;
+ auto self(shared_from_this());
+ MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
+ if (remote.id_ && remote.offset_) {
+ auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+ send_buffer_ = imsg.content();
+ async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+ };
+ auto &scenter = *pscenter_;
+ if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) {
+ send_buffer_ = "fake reply";
+ async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+ }
} else {
- socket_.close();
+ LOG_DEBUG() << "no address";
+ send_buffer_ = "no address";
+ async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
}
- });
-}
\ No newline at end of file
+
+ } else { // read again
+ LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
+ socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+ }
+};
\ No newline at end of file
diff --git a/box/tcp_connection.h b/box/tcp_connection.h
index b3b8344..5aa93a4 100644
--- a/box/tcp_connection.h
+++ b/box/tcp_connection.h
@@ -18,46 +18,61 @@
#ifndef TCP_CONNECTION_H373GIL5
#define TCP_CONNECTION_H373GIL5
+#include "bh_util.h"
+#include "node_center.h"
#include "tcp_common.h"
#include <functional>
#include <memory>
+class ShmSocket;
class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1>
{
public:
- static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request)
+ static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket)
{
- std::make_shared<TcpRequest1>(io, addr, std::move(request))->Connect();
+ std::make_shared<TcpRequest1>(io, addr, std::move(request), shm_socket)->Start();
}
- TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request);
+ TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) :
+ socket_(io), shm_socket_(shm_socket), remote_(addr), request_(std::move(request)) {}
+ void OnError(bserror_t ec);
private:
- void Connect();
+ void Start();
void Close();
- void SendRequest();
- void ReadReply();
+ void OnRead(size_t size);
tcp::socket socket_;
+ ShmSocket &shm_socket_; // send reply
tcp::endpoint remote_;
std::string request_;
- std::vector<char> buffer_;
+ std::vector<char> recv_buffer_;
+ size_t recv_len_ = 0;
};
+class NodeCenter;
class TcpReply1 : public std::enable_shared_from_this<TcpReply1>
{
public:
- static void Create(tcp::socket sock)
+ typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
+ static void Create(tcp::socket sock, CenterPtr pscenter)
{
- std::make_shared<TcpReply1>(std::move(sock))->Start();
+ std::make_shared<TcpReply1>(std::move(sock), pscenter)->Start();
}
- TcpReply1(tcp::socket sock);
- void Start();
+ TcpReply1(tcp::socket sock, CenterPtr pscenter) :
+ socket_(std::move(sock)), pscenter_(pscenter) {}
+ void OnError(bserror_t ec);
private:
+ void Start();
+ void Close();
+ void OnRead(size_t size);
+
tcp::socket socket_;
+ CenterPtr pscenter_;
std::vector<char> recv_buffer_;
+ uint32_t recv_len_ = 0;
std::string send_buffer_;
};
diff --git a/box/tcp_proxy.cpp b/box/tcp_proxy.cpp
index 2e95a3b..298c6b6 100644
--- a/box/tcp_proxy.cpp
+++ b/box/tcp_proxy.cpp
@@ -36,8 +36,7 @@
auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
auto &dest = head.dest();
if (dest.ip().empty() || dest.port() == 0) { return; }
- bool r = Send(dest.ip(), dest.port(), msg.content());
- // TODO check send fail.
+ Request(dest.ip(), dest.port(), msg.content());
};
local_->Start(1, localProc);
@@ -46,34 +45,25 @@
io_context_.run_one_for(std::chrono::milliseconds(100));
}
};
+ std::thread(proxyProc).swap(worker_);
return true;
}
void TcpProxy::Stop()
{
- local_.reset();
-
bool cur = true;
if (run_.compare_exchange_strong(cur, false)) {
if (worker_.joinable()) {
worker_.join();
}
+ local_.reset();
}
}
-bool TcpProxy::Send(const std::string &ip, int port, std::string &&content)
+bool TcpProxy::Request(const std::string &ip, int port, std::string &&content)
{
if (content.empty()) { return false; }
tcp::endpoint dest(ip::address::from_string(ip), port);
- TcpRequest1::Create(io_context_, dest, std::move(content));
-
- // char tag[sizeof(kBHTcpServerTag)] = {0};
-
- // int n = read(sock, tag, sizeof(tag));
- // if (n == sizeof(tag) && memcmp(tag, &kBHTcpServerTag, sizeof(tag)) == 0) {
- // send(sock, content.data(), content.size(), 0);
- // connections_[addr].io_info_.h_ = [this, sock](int events) { OnReply(sock); };
- // // success
- // }
+ TcpRequest1::Create(io_context_, dest, std::move(content), *local_);
}
diff --git a/box/tcp_proxy.h b/box/tcp_proxy.h
index 8f2af91..9c74532 100644
--- a/box/tcp_proxy.h
+++ b/box/tcp_proxy.h
@@ -34,7 +34,7 @@
void Stop();
private:
- bool Send(const std::string &ip, int port, std::string &&content);
+ bool Request(const std::string &ip, int port, std::string &&content);
std::unique_ptr<ShmSocket> local_;
boost::asio::io_context io_context_;
diff --git a/box/tcp_server.cpp b/box/tcp_server.cpp
index e4e229c..ea23106 100644
--- a/box/tcp_server.cpp
+++ b/box/tcp_server.cpp
@@ -23,39 +23,17 @@
using namespace std::chrono_literals;
-TcpServer::TcpServer(int port) :
- run_(false), listener_(io_, tcp::endpoint(tcp::v6(), port))
+TcpServer::TcpServer(int port, CenterPtr pscenter) :
+ listener_(io(), tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter)
{
Accept();
}
TcpServer::~TcpServer() { Stop(); }
-bool TcpServer::Start()
+void TcpServer::OnStop()
{
- Stop();
- bool cur = false;
- if (run_.compare_exchange_strong(cur, true)) {
- auto proc = [this]() {
- while (run_) {
- io_.run_one_for(100ms);
- }
- };
- std::thread(proc).swap(worker_);
- }
-}
-void TcpServer::Stop()
-{
- bool cur = true;
- if (run_.compare_exchange_strong(cur, false)) {
- io_.post([this]() {
- listener_.close();
- });
- std::this_thread::sleep_for(1s);
- if (worker_.joinable()) {
- worker_.join();
- }
- }
+ listener_.close();
}
void TcpServer::Accept()
@@ -63,7 +41,7 @@
listener_.async_accept([this](bserror_t ec, tcp::socket sock) {
if (!ec) {
LOG_INFO() << "server accept client";
- TcpReply1::Create(std::move(sock));
+ TcpReply1::Create(std::move(sock), pscenter_);
}
Accept();
});
diff --git a/box/tcp_server.h b/box/tcp_server.h
index c06cddc..2c9337c 100644
--- a/box/tcp_server.h
+++ b/box/tcp_server.h
@@ -18,23 +18,23 @@
#ifndef TCP_SERVER_795VXR94
#define TCP_SERVER_795VXR94
+#include "bh_util.h"
+#include "io_service.h"
#include "tcp_common.h"
-#include <thread>
-class TcpServer
+class NodeCenter;
+class TcpServer : public IoService
{
public:
- explicit TcpServer(int port);
+ typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
+ TcpServer(int port, CenterPtr pscenter);
~TcpServer();
- bool Start();
- void Stop();
private:
+ virtual void OnStop();
void Accept();
- std::thread worker_;
- std::atomic<bool> run_;
- boost::asio::io_context io_;
tcp::acceptor listener_;
+ CenterPtr pscenter_;
};
#endif // end of include guard: TCP_SERVER_795VXR94
diff --git a/src/msg.h b/src/msg.h
index 12922b5..f0ed840 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -75,12 +75,16 @@
};
OffsetType offset_;
SharedMemory *pshm_;
- void *Alloc(const size_t size)
+
+ void *Alloc(const size_t size, const void *src = nullptr)
{
void *p = shm().Alloc(sizeof(Meta) + size);
if (p) {
auto pmeta = new (p) Meta(size);
p = pmeta + 1;
+ if (src) {
+ memcpy(p, src, size);
+ }
}
return p;
}
@@ -108,16 +112,35 @@
}
return addr;
}
-
- void *Pack(const std::string &content)
+ void *Pack(const BHMsgHead &head, const uint32_t head_len, const std::string &body_content)
{
void *addr = get();
if (addr) {
- memcpy(addr, content.data(), content.size());
- meta()->size_ = content.size();
+ auto p = static_cast<char *>(addr);
+ auto Pack1 = [&p](uint32_t len, auto &&writer) {
+ Put32(p, len);
+ p += sizeof(len);
+ writer(p, len);
+ p += len;
+ };
+ Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); });
+ Pack1(body_content.size(), [&](void *p, int len) { memcpy(p, body_content.data(), len); });
+ meta()->size_ = 4 + head_len + 4 + body_content.size();
}
return addr;
}
+
+ void *Pack(const void *src, const size_t size)
+ {
+ void *addr = get();
+ if (addr && src) {
+ memcpy(addr, src, size);
+ meta()->size_ = size;
+ }
+ return addr;
+ }
+
+ void *Pack(const std::string &content) { return Pack(content.data(), content.size()); }
bool Make(void *addr)
{
@@ -166,6 +189,13 @@
uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
return Make(size) && Pack(head, head_len, body, body_len);
}
+ inline bool Make(const BHMsgHead &head, const std::string &body_content)
+ {
+ uint32_t head_len = head.ByteSizeLong();
+ uint32_t size = sizeof(head_len) + head_len + sizeof(uint32_t) + body_content.size();
+ return Make(size) && Pack(head, head_len, body_content);
+ }
+
template <class Body>
inline bool Fill(const BHMsgHead &head, const Body &body)
{
@@ -175,8 +205,11 @@
return valid() && (meta()->capacity_ >= size) && Pack(head, head_len, body, body_len);
}
- inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); }
- inline bool Fill(const std::string &content) { return valid() && (meta()->capacity_ >= content.size()) && Pack(content); }
+ inline bool Make(const void *src, const size_t size) { return Make(Alloc(size, src)); }
+ inline bool Fill(const void *src, const size_t size) { return valid() && (meta()->capacity_ >= size) && Pack(src, size); }
+
+ inline bool Make(const std::string &content) { return Make(content.data(), content.size()); }
+ inline bool Fill(const std::string &content) { return Fill(content.data(), content.size()); }
inline bool Make(const size_t size) { return Make(Alloc(size)); }
diff --git a/src/shm_socket.cpp b/src/shm_socket.cpp
index 11824d7..d54168d 100644
--- a/src/shm_socket.cpp
+++ b/src/shm_socket.cpp
@@ -145,46 +145,66 @@
return false;
}
-bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb)
+bool ShmSocket::Send(const MQInfo &remote, std::string &&content)
{
size_t size = content.size();
- auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
+ auto OnResult = [content = std::move(content), remote, this](MsgI &msg) mutable {
if (!msg.Fill(content)) { return false; }
try {
- if (!cb) {
- Send(remote, msg);
- } else {
- per_msg_cbs_->Store(msg_id, std::move(cb));
- auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
- RecvCB cb_no_use;
- per_msg_cbs_->Pick(msg_id, cb_no_use);
- };
- Send(remote, msg, onExpireRemoveCB);
- }
+ SendImpl(remote, msg);
return true;
} catch (...) {
SetLastError(eError, "Send internal error.");
return false;
}
};
+
+ return RequestAlloc(size, OnResult);
+}
+
+bool ShmSocket::Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb)
+{
+ try {
+ per_msg_cbs_->Store(msg_id, std::move(cb));
+ auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
+ RecvCB cb_no_use;
+ per_msg_cbs_->Pick(msg_id, cb_no_use);
+ };
+ SendImpl(remote, msg, onExpireRemoveCB);
+ return true;
+ } catch (std::exception &e) {
+ SetLastError(eError, "Send internal error.");
+ return false;
+ }
+}
+
+bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb)
+{
+ size_t size = content.size();
+ auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
+ return msg.Fill(content) && Send(remote, msg, msg_id, std::move(cb));
+ };
+
+ return RequestAlloc(size, OnResult);
+}
+
+bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult)
+{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
#if 0
// self alloc
MsgI msg(shm());
if (msg.Make(size)) {
DEFER1(msg.Release());
- return OnResult(msg);
+ return onResult(msg);
+ } else {
+ return false;
}
-#else
- // center alloc
- return RequestAlloc(size, OnResult);
#endif
-}
-bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult)
-{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
// LOG_FUNCTION;
if (node_proc_index_ == -1 || socket_index_ == -1) {
+ LOG_ERROR() << "socket not inited.";
return false;
}
int id = (++alloc_id_) & MaskBits(28);
diff --git a/src/shm_socket.h b/src/shm_socket.h
index bf78e89..9dfdd6b 100644
--- a/src/shm_socket.h
+++ b/src/shm_socket.h
@@ -68,16 +68,16 @@
bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
+ bool Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb);
template <class Body>
- bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
- {
- return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb));
- }
- template <class... T>
- bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t)
- {
- return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
- }
+ bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb) { return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); }
+ bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb);
+
+ template <class Body>
+ bool Send(const MQInfo &remote, BHMsgHead &head, Body &body) { return Send(remote, MsgI::Serialize(head, body)); }
+ bool Send(const MQInfo &remote, std::string &&content);
+ bool Send(const MQInfo &remote, const MsgI &imsg) { return SendImpl(remote, imsg); }
+
template <class... T>
bool Send(const MQInfo &remote, const int64_t cmd, T &&...t)
{
@@ -135,8 +135,6 @@
private:
bool StopNoLock();
bool RunningNoLock() { return !workers_.empty(); }
-
- bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB());
template <class... Rest>
bool SendImpl(Rest &&...rest)
diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp
index ff31c3e..a838252 100644
--- a/utest/tcp_test.cpp
+++ b/utest/tcp_test.cpp
@@ -16,6 +16,8 @@
* =====================================================================================
*/
+#include "defs.h"
+#include "node_center.h"
#include "tcp_connection.h"
#include "tcp_server.h"
#include "util.h"
@@ -31,18 +33,31 @@
BOOST_AUTO_TEST_CASE(TcpTest)
{
- const std::string bind_addr = "127.0.0.1";
- const std::string connect_addr = "127.0.0.1";
- const uint16_t port = 10000;
+ SharedMemory &shm = TestShm();
- TcpServer server(port);
- server.Start();
+ const std::string connect_addr = "127.0.0.1";
+ const uint16_t port = kBHCenterPort;
boost::asio::io_context io;
tcp::endpoint dest(ip::address::from_string(connect_addr), port);
- for (int i = 0; i < 10; ++i) {
- TcpRequest1::Create(io, dest, "client->server " + std::to_string(i));
+ MsgRequestTopic req;
+ req.set_topic("#center_query_procs");
+ req.set_data("");
+ auto head = InitMsgHead(GetType(req), "#test_proc", 1000000);
+ auto route = head.add_route();
+ route->set_mq_id(12345);
+ route->set_abs_addr(67890);
+
+ head.mutable_dest()->set_ip(connect_addr);
+ head.mutable_dest()->set_port(port);
+ head.mutable_dest()->set_mq_id(1000011);
+ head.mutable_dest()->set_abs_addr(10296);
+
+ auto request(MsgI::Serialize(head, req));
+ for (int i = 0; i < 1; ++i) {
+ LOG_DEBUG() << "request size: " << request.size();
+ TcpRequest1::Create(io, dest, request, DefaultSender(BHomeShm()));
}
io.run();
--
Gitblit v1.8.0