From 026bbfaf2b5d73a26b8e2fa49158883ef64c211b Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 27 五月 2021 13:51:26 +0800 Subject: [PATCH] tcp server call center to send proxy requests. --- box/tcp_connection.cpp | 228 +++++++++++++++---- box/center.cpp | 4 src/msg.h | 47 +++ box/io_service.h | 42 +++ box/tcp_proxy.cpp | 20 - box/center.h | 2 box/node_center.cpp | 23 ++ box/io_service.cpp | 48 ++++ src/shm_socket.cpp | 58 +++- src/shm_socket.h | 20 - box/tcp_proxy.h | 2 box/tcp_server.h | 16 utest/tcp_test.cpp | 29 ++ box/node_center.h | 1 box/tcp_server.cpp | 32 -- box/tcp_connection.h | 37 ++- 16 files changed, 449 insertions(+), 160 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index c3a03e3..8d24315 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -18,6 +18,7 @@ #include "center.h" #include "center_topic_node.h" #include "node_center.h" +#include "tcp_server.h" #include <chrono> using namespace std::chrono; @@ -175,6 +176,7 @@ } topic_node_.reset(new CenterTopicNode(center_ptr, shm)); + tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr)); } BHCenter::~BHCenter() { Stop(); } @@ -186,11 +188,13 @@ sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); } topic_node_->Start(); + tcp_server_->Start(); return true; } bool BHCenter::Stop() { + tcp_server_->Stop(); topic_node_->Stop(); for (auto &kv : sockets_) { kv.second->Stop(); diff --git a/box/center.h b/box/center.h index 6610277..c4aa1ac 100644 --- a/box/center.h +++ b/box/center.h @@ -23,6 +23,7 @@ #include <map> #include <memory> class CenterTopicNode; +class TcpServer; class BHCenter { @@ -53,6 +54,7 @@ std::map<std::string, std::shared_ptr<ShmSocket>> sockets_; std::unique_ptr<CenterTopicNode> topic_node_; + std::unique_ptr<TcpServer> tcp_server_; }; #endif // end of include guard: CENTER_TM9OUQTG diff --git a/box/io_service.cpp b/box/io_service.cpp new file mode 100644 index 0000000..1d531e0 --- /dev/null +++ b/box/io_service.cpp @@ -0,0 +1,48 @@ +/* + * ===================================================================================== + * + * Filename: io_service.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�27鏃� 13鏃�25鍒�18绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#include "io_service.h" +#include <chrono> +using namespace std::chrono_literals; + +bool IoService::Start() +{ + Stop(); + bool cur = false; + if (!run_.compare_exchange_strong(cur, true)) { + return false; + } + + auto proc = [this]() { + while (run_) { + io_.run_one_for(100ms); + } + OnStop(); + }; + std::thread(proc).swap(worker_); + return true; +} + +void IoService::Stop() +{ + bool cur = true; + if (run_.compare_exchange_strong(cur, false)) { + if (worker_.joinable()) { + worker_.join(); + } + } +} diff --git a/box/io_service.h b/box/io_service.h new file mode 100644 index 0000000..000facc --- /dev/null +++ b/box/io_service.h @@ -0,0 +1,42 @@ +/* + * ===================================================================================== + * + * Filename: io_service.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�27鏃� 13鏃�25鍒�37绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef IO_SERVICE_ODKKJG3D +#define IO_SERVICE_ODKKJG3D + +#include <boost/asio.hpp> +#include <thread> + +class IoService +{ +public: + IoService() : + run_(false) {} + bool Start(); + void Stop(); + + typedef boost::asio::io_context io_service_t; + io_service_t &io() { return io_; } + +private: + virtual void OnStop() {} + io_service_t io_; + std::thread worker_; + std::atomic<bool> run_; +}; + +#endif // end of include guard: IO_SERVICE_ODKKJG3D diff --git a/box/node_center.cpp b/box/node_center.cpp index cbaef0e..ff199b2 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -209,6 +209,29 @@ RecordMsg(msg); return socket.Send(dest, msg); } +bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) +{ + auto ssn = dest.id_ - (dest.id_ % 10); + LOG_DEBUG() << "prox ssn " << ssn; + auto pos = nodes_.find(ssn); + if (pos == nodes_.end()) { + LOG_ERROR() << "proxy msg, ssn not found."; + return false; + } + auto &node = pos->second; + if (!Valid(*node)) { return false; } + + ShmSocket &sender(DefaultSender(node->shm_)); + auto route = head.add_route(); + route->set_mq_id(sender.id()); + route->set_abs_addr(sender.AbsAddr()); + + ShmMsg msg(node->shm_); + if (!msg.Make(head, body_content)) { return false; } + DEFER1(msg.Release();); + RecordMsg(msg); + return sender.Send(dest, msg, head.msg_id(), std::move(cb)); +} void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) { diff --git a/box/node_center.h b/box/node_center.h index caaf054..1c79809 100644 --- a/box/node_center.h +++ b/box/node_center.h @@ -122,6 +122,7 @@ void RecordMsg(const MsgI &msg); bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); + bool ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); void OnAlloc(ShmSocket &socket, const int64_t val); void OnFree(ShmSocket &socket, const int64_t val); bool OnCommand(ShmSocket &socket, const int64_t val); diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp index 8968741..22162e5 100644 --- a/box/tcp_connection.cpp +++ b/box/tcp_connection.cpp @@ -17,78 +17,198 @@ */ #include "tcp_connection.h" #include "log.h" +#include "msg.h" +#include "node_center.h" +#include "shm_socket.h" namespace { template <class C> -auto Buffer(C &c) { return boost::asio::buffer(c.data(), c.size()); } +auto Buffer(C &c, size_t offset = 0) { return boost::asio::buffer(c.data() + offset, c.size() - offset); } using boost::asio::async_read; using boost::asio::async_write; +typedef std::function<void()> VoidHandler; +typedef std::function<void(size_t)> SizeHandler; + +template <class T, class... Param> +auto TcpCallback(T &conn, std::function<void(Param...)> const &func) +{ + auto self(conn.shared_from_this()); + return [self, func](bserror_t ec, Param... size) { + if (!ec) { + func(size...); + } else { + self->OnError(ec); + } + }; +} + +template <class T> +auto TcpCB(T &conn, VoidHandler const &func) { return TcpCallback(conn, func); } + +template <class T> +auto TcpCBSize(T &conn, SizeHandler const &func) { return TcpCallback(conn, func); } + +template <class T> +auto TcpCBSize(T &conn, VoidHandler const &func) +{ + return TcpCBSize(conn, [func](size_t) { func(); }); +} + +bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content) +{ + const char *p = buffer.data(); + LOG_DEBUG() << "msg len " << len; + if (4 > len) { return false; } + uint32_t head_len = Get32(p); + LOG_DEBUG() << "head_len " << head_len; + if (head_len > 1024 * 4) { + throw std::runtime_error("unexpected tcp reply data."); + } + auto before_body = 4 + head_len + 4; + if (before_body > len) { + if (before_body > buffer.size()) { + buffer.resize(before_body); + } + return false; + } + if (!head.ParseFromArray(p + 4, head_len)) { + throw std::runtime_error("tcp recv invalid reply head."); + } + uint32_t body_len = Get32(p + 4 + head_len); + buffer.resize(before_body + body_len); + if (buffer.size() > len) { return false; } + body_content.assign(p + before_body, body_len); + return true; +} + } // namespace -TcpRequest1::TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) : - socket_(io), remote_(addr), request_(std::move(request)) {} -void TcpRequest1::Connect() +/// request ----------------------------------------------------------- + +void TcpRequest1::OnError(bserror_t ec) { - auto self = shared_from_this(); - socket_.async_connect(remote_, [this, self](bserror_t ec) { - if (!ec) { - SendRequest(); - } else { - LOG_ERROR() << "connect error " << ec; - Close(); - } - }); + LOG_ERROR() << "tcp client error: " << ec; + Close(); +} + +void TcpRequest1::Start() +{ + auto readReply = [this]() { + recv_buffer_.resize(1000); + recv_len_ = 0; + socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); + }; + auto request = [this, readReply]() { async_write(socket_, Buffer(request_), TcpCBSize(*this, readReply)); }; + + socket_.async_connect(remote_, TcpCB(*this, request)); } void TcpRequest1::Close() { + LOG_DEBUG() << "client close"; + socket_.close(); +} +void TcpRequest1::OnRead(size_t size) +{ + LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_; + + recv_len_ += size; + BHMsgHead head; + std::string body_content; + bool recv_done = false; + try { + recv_done = CheckData(recv_buffer_, recv_len_, head, body_content); + } catch (std::exception &e) { + LOG_ERROR() << e.what(); + Close(); + return; + } + + if (recv_done) { + // just pass to client, no check, client will check it anyway. + LOG_DEBUG() << "route size: " << head.route_size(); + if (head.route_size() < 1) { return; } + auto &back = head.route(head.route_size() - 1); + MQInfo dest = {back.mq_id(), back.abs_addr()}; + head.mutable_route()->RemoveLast(); + + LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_; + MsgRequestTopicReply reply; + if (reply.ParseFromString(body_content)) { + LOG_DEBUG() << "err msg: " << reply.errmsg().errstring(); + LOG_DEBUG() << "content : " << reply.data(); + } + Close(); + return; + shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size())); + } else { // read again + LOG_DEBUG() << "not complete, read again " << recv_buffer_.size(); + socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); + } +} + +/// reply -------------------------------------------------------------- + +void TcpReply1::OnError(bserror_t ec) { Close(); } +void TcpReply1::Close() +{ + LOG_DEBUG() << "server close."; socket_.close(); } -void TcpRequest1::SendRequest() -{ - LOG_INFO() << "client sending request " << request_; - auto self = shared_from_this(); - async_write(socket_, Buffer(request_), [this, self](bserror_t ec, size_t) { - if (!ec) { - ReadReply(); - } else { - Close(); - } - }); -} -void TcpRequest1::ReadReply() -{ - buffer_.resize(1000); - auto self = shared_from_this(); - socket_.async_read_some(Buffer(buffer_), [this, self](bserror_t ec, size_t size) { - if (!ec) { - printf("reply data: %s\n", buffer_.data()); - } else { - Close(); - } - }); -} - -TcpReply1::TcpReply1(tcp::socket sock) : - socket_(std::move(sock)) {} - void TcpReply1::Start() { - LOG_INFO() << "server session reading..."; recv_buffer_.resize(1000); - auto self(shared_from_this()); - socket_.async_read_some(Buffer(recv_buffer_), [this, self](bserror_t ec, size_t size) { - LOG_INFO() << "server read : " << recv_buffer_.data(); - // fake reply - if (!ec) { - send_buffer_ = std::string(recv_buffer_.data(), size) + " reply"; - async_write(socket_, Buffer(send_buffer_), [this, self](bserror_t ec, size_t size) { - socket_.close(); - }); + socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); +} + +void TcpReply1::OnRead(size_t size) +{ + recv_len_ += size; + BHMsgHead head; + std::string body_content; + bool recv_done = false; + try { + recv_done = CheckData(recv_buffer_, recv_len_, head, body_content); + } catch (std::exception &e) { + LOG_ERROR() << e.what(); + Close(); + return; + } + + auto ParseBody = [&](auto &req) { + const char *p = recv_buffer_.data(); + uint32_t size = Get32(p); + p += 4; + p += size; + size = Get32(p); + p += 4; + return req.ParseFromArray(p, size); + }; + + if (recv_done) { + LOG_DEBUG() << "request data: " << size; + auto self(shared_from_this()); + MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()}; + if (remote.id_ && remote.offset_) { + auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + send_buffer_ = imsg.content(); + async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); + }; + auto &scenter = *pscenter_; + if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) { + send_buffer_ = "fake reply"; + async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); + } } else { - socket_.close(); + LOG_DEBUG() << "no address"; + send_buffer_ = "no address"; + async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); } - }); -} \ No newline at end of file + + } else { // read again + LOG_DEBUG() << "not complete, read again " << recv_buffer_.size(); + socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); + } +}; \ No newline at end of file diff --git a/box/tcp_connection.h b/box/tcp_connection.h index b3b8344..5aa93a4 100644 --- a/box/tcp_connection.h +++ b/box/tcp_connection.h @@ -18,46 +18,61 @@ #ifndef TCP_CONNECTION_H373GIL5 #define TCP_CONNECTION_H373GIL5 +#include "bh_util.h" +#include "node_center.h" #include "tcp_common.h" #include <functional> #include <memory> +class ShmSocket; class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1> { public: - static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) + static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) { - std::make_shared<TcpRequest1>(io, addr, std::move(request))->Connect(); + std::make_shared<TcpRequest1>(io, addr, std::move(request), shm_socket)->Start(); } - TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request); + TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) : + socket_(io), shm_socket_(shm_socket), remote_(addr), request_(std::move(request)) {} + void OnError(bserror_t ec); private: - void Connect(); + void Start(); void Close(); - void SendRequest(); - void ReadReply(); + void OnRead(size_t size); tcp::socket socket_; + ShmSocket &shm_socket_; // send reply tcp::endpoint remote_; std::string request_; - std::vector<char> buffer_; + std::vector<char> recv_buffer_; + size_t recv_len_ = 0; }; +class NodeCenter; class TcpReply1 : public std::enable_shared_from_this<TcpReply1> { public: - static void Create(tcp::socket sock) + typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; + static void Create(tcp::socket sock, CenterPtr pscenter) { - std::make_shared<TcpReply1>(std::move(sock))->Start(); + std::make_shared<TcpReply1>(std::move(sock), pscenter)->Start(); } - TcpReply1(tcp::socket sock); - void Start(); + TcpReply1(tcp::socket sock, CenterPtr pscenter) : + socket_(std::move(sock)), pscenter_(pscenter) {} + void OnError(bserror_t ec); private: + void Start(); + void Close(); + void OnRead(size_t size); + tcp::socket socket_; + CenterPtr pscenter_; std::vector<char> recv_buffer_; + uint32_t recv_len_ = 0; std::string send_buffer_; }; diff --git a/box/tcp_proxy.cpp b/box/tcp_proxy.cpp index 2e95a3b..298c6b6 100644 --- a/box/tcp_proxy.cpp +++ b/box/tcp_proxy.cpp @@ -36,8 +36,7 @@ auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { auto &dest = head.dest(); if (dest.ip().empty() || dest.port() == 0) { return; } - bool r = Send(dest.ip(), dest.port(), msg.content()); - // TODO check send fail. + Request(dest.ip(), dest.port(), msg.content()); }; local_->Start(1, localProc); @@ -46,34 +45,25 @@ io_context_.run_one_for(std::chrono::milliseconds(100)); } }; + std::thread(proxyProc).swap(worker_); return true; } void TcpProxy::Stop() { - local_.reset(); - bool cur = true; if (run_.compare_exchange_strong(cur, false)) { if (worker_.joinable()) { worker_.join(); } + local_.reset(); } } -bool TcpProxy::Send(const std::string &ip, int port, std::string &&content) +bool TcpProxy::Request(const std::string &ip, int port, std::string &&content) { if (content.empty()) { return false; } tcp::endpoint dest(ip::address::from_string(ip), port); - TcpRequest1::Create(io_context_, dest, std::move(content)); - - // char tag[sizeof(kBHTcpServerTag)] = {0}; - - // int n = read(sock, tag, sizeof(tag)); - // if (n == sizeof(tag) && memcmp(tag, &kBHTcpServerTag, sizeof(tag)) == 0) { - // send(sock, content.data(), content.size(), 0); - // connections_[addr].io_info_.h_ = [this, sock](int events) { OnReply(sock); }; - // // success - // } + TcpRequest1::Create(io_context_, dest, std::move(content), *local_); } diff --git a/box/tcp_proxy.h b/box/tcp_proxy.h index 8f2af91..9c74532 100644 --- a/box/tcp_proxy.h +++ b/box/tcp_proxy.h @@ -34,7 +34,7 @@ void Stop(); private: - bool Send(const std::string &ip, int port, std::string &&content); + bool Request(const std::string &ip, int port, std::string &&content); std::unique_ptr<ShmSocket> local_; boost::asio::io_context io_context_; diff --git a/box/tcp_server.cpp b/box/tcp_server.cpp index e4e229c..ea23106 100644 --- a/box/tcp_server.cpp +++ b/box/tcp_server.cpp @@ -23,39 +23,17 @@ using namespace std::chrono_literals; -TcpServer::TcpServer(int port) : - run_(false), listener_(io_, tcp::endpoint(tcp::v6(), port)) +TcpServer::TcpServer(int port, CenterPtr pscenter) : + listener_(io(), tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter) { Accept(); } TcpServer::~TcpServer() { Stop(); } -bool TcpServer::Start() +void TcpServer::OnStop() { - Stop(); - bool cur = false; - if (run_.compare_exchange_strong(cur, true)) { - auto proc = [this]() { - while (run_) { - io_.run_one_for(100ms); - } - }; - std::thread(proc).swap(worker_); - } -} -void TcpServer::Stop() -{ - bool cur = true; - if (run_.compare_exchange_strong(cur, false)) { - io_.post([this]() { - listener_.close(); - }); - std::this_thread::sleep_for(1s); - if (worker_.joinable()) { - worker_.join(); - } - } + listener_.close(); } void TcpServer::Accept() @@ -63,7 +41,7 @@ listener_.async_accept([this](bserror_t ec, tcp::socket sock) { if (!ec) { LOG_INFO() << "server accept client"; - TcpReply1::Create(std::move(sock)); + TcpReply1::Create(std::move(sock), pscenter_); } Accept(); }); diff --git a/box/tcp_server.h b/box/tcp_server.h index c06cddc..2c9337c 100644 --- a/box/tcp_server.h +++ b/box/tcp_server.h @@ -18,23 +18,23 @@ #ifndef TCP_SERVER_795VXR94 #define TCP_SERVER_795VXR94 +#include "bh_util.h" +#include "io_service.h" #include "tcp_common.h" -#include <thread> -class TcpServer +class NodeCenter; +class TcpServer : public IoService { public: - explicit TcpServer(int port); + typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; + TcpServer(int port, CenterPtr pscenter); ~TcpServer(); - bool Start(); - void Stop(); private: + virtual void OnStop(); void Accept(); - std::thread worker_; - std::atomic<bool> run_; - boost::asio::io_context io_; tcp::acceptor listener_; + CenterPtr pscenter_; }; #endif // end of include guard: TCP_SERVER_795VXR94 diff --git a/src/msg.h b/src/msg.h index 12922b5..f0ed840 100644 --- a/src/msg.h +++ b/src/msg.h @@ -75,12 +75,16 @@ }; OffsetType offset_; SharedMemory *pshm_; - void *Alloc(const size_t size) + + void *Alloc(const size_t size, const void *src = nullptr) { void *p = shm().Alloc(sizeof(Meta) + size); if (p) { auto pmeta = new (p) Meta(size); p = pmeta + 1; + if (src) { + memcpy(p, src, size); + } } return p; } @@ -108,16 +112,35 @@ } return addr; } - - void *Pack(const std::string &content) + void *Pack(const BHMsgHead &head, const uint32_t head_len, const std::string &body_content) { void *addr = get(); if (addr) { - memcpy(addr, content.data(), content.size()); - meta()->size_ = content.size(); + auto p = static_cast<char *>(addr); + auto Pack1 = [&p](uint32_t len, auto &&writer) { + Put32(p, len); + p += sizeof(len); + writer(p, len); + p += len; + }; + Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); }); + Pack1(body_content.size(), [&](void *p, int len) { memcpy(p, body_content.data(), len); }); + meta()->size_ = 4 + head_len + 4 + body_content.size(); } return addr; } + + void *Pack(const void *src, const size_t size) + { + void *addr = get(); + if (addr && src) { + memcpy(addr, src, size); + meta()->size_ = size; + } + return addr; + } + + void *Pack(const std::string &content) { return Pack(content.data(), content.size()); } bool Make(void *addr) { @@ -166,6 +189,13 @@ uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len; return Make(size) && Pack(head, head_len, body, body_len); } + inline bool Make(const BHMsgHead &head, const std::string &body_content) + { + uint32_t head_len = head.ByteSizeLong(); + uint32_t size = sizeof(head_len) + head_len + sizeof(uint32_t) + body_content.size(); + return Make(size) && Pack(head, head_len, body_content); + } + template <class Body> inline bool Fill(const BHMsgHead &head, const Body &body) { @@ -175,8 +205,11 @@ return valid() && (meta()->capacity_ >= size) && Pack(head, head_len, body, body_len); } - inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); } - inline bool Fill(const std::string &content) { return valid() && (meta()->capacity_ >= content.size()) && Pack(content); } + inline bool Make(const void *src, const size_t size) { return Make(Alloc(size, src)); } + inline bool Fill(const void *src, const size_t size) { return valid() && (meta()->capacity_ >= size) && Pack(src, size); } + + inline bool Make(const std::string &content) { return Make(content.data(), content.size()); } + inline bool Fill(const std::string &content) { return Fill(content.data(), content.size()); } inline bool Make(const size_t size) { return Make(Alloc(size)); } diff --git a/src/shm_socket.cpp b/src/shm_socket.cpp index 11824d7..d54168d 100644 --- a/src/shm_socket.cpp +++ b/src/shm_socket.cpp @@ -145,46 +145,66 @@ return false; } -bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) +bool ShmSocket::Send(const MQInfo &remote, std::string &&content) { size_t size = content.size(); - auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { + auto OnResult = [content = std::move(content), remote, this](MsgI &msg) mutable { if (!msg.Fill(content)) { return false; } try { - if (!cb) { - Send(remote, msg); - } else { - per_msg_cbs_->Store(msg_id, std::move(cb)); - auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { - RecvCB cb_no_use; - per_msg_cbs_->Pick(msg_id, cb_no_use); - }; - Send(remote, msg, onExpireRemoveCB); - } + SendImpl(remote, msg); return true; } catch (...) { SetLastError(eError, "Send internal error."); return false; } }; + + return RequestAlloc(size, OnResult); +} + +bool ShmSocket::Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb) +{ + try { + per_msg_cbs_->Store(msg_id, std::move(cb)); + auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { + RecvCB cb_no_use; + per_msg_cbs_->Pick(msg_id, cb_no_use); + }; + SendImpl(remote, msg, onExpireRemoveCB); + return true; + } catch (std::exception &e) { + SetLastError(eError, "Send internal error."); + return false; + } +} + +bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) +{ + size_t size = content.size(); + auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { + return msg.Fill(content) && Send(remote, msg, msg_id, std::move(cb)); + }; + + return RequestAlloc(size, OnResult); +} + +bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) +{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag #if 0 // self alloc MsgI msg(shm()); if (msg.Make(size)) { DEFER1(msg.Release()); - return OnResult(msg); + return onResult(msg); + } else { + return false; } -#else - // center alloc - return RequestAlloc(size, OnResult); #endif -} -bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) -{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag // LOG_FUNCTION; if (node_proc_index_ == -1 || socket_index_ == -1) { + LOG_ERROR() << "socket not inited."; return false; } int id = (++alloc_id_) & MaskBits(28); diff --git a/src/shm_socket.h b/src/shm_socket.h index bf78e89..9dfdd6b 100644 --- a/src/shm_socket.h +++ b/src/shm_socket.h @@ -68,16 +68,16 @@ bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult); + bool Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb); template <class Body> - bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) - { - return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); - } - template <class... T> - bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t) - { - return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...); - } + bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb) { return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); } + bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb); + + template <class Body> + bool Send(const MQInfo &remote, BHMsgHead &head, Body &body) { return Send(remote, MsgI::Serialize(head, body)); } + bool Send(const MQInfo &remote, std::string &&content); + bool Send(const MQInfo &remote, const MsgI &imsg) { return SendImpl(remote, imsg); } + template <class... T> bool Send(const MQInfo &remote, const int64_t cmd, T &&...t) { @@ -135,8 +135,6 @@ private: bool StopNoLock(); bool RunningNoLock() { return !workers_.empty(); } - - bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB()); template <class... Rest> bool SendImpl(Rest &&...rest) diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp index ff31c3e..a838252 100644 --- a/utest/tcp_test.cpp +++ b/utest/tcp_test.cpp @@ -16,6 +16,8 @@ * ===================================================================================== */ +#include "defs.h" +#include "node_center.h" #include "tcp_connection.h" #include "tcp_server.h" #include "util.h" @@ -31,18 +33,31 @@ BOOST_AUTO_TEST_CASE(TcpTest) { - const std::string bind_addr = "127.0.0.1"; - const std::string connect_addr = "127.0.0.1"; - const uint16_t port = 10000; + SharedMemory &shm = TestShm(); - TcpServer server(port); - server.Start(); + const std::string connect_addr = "127.0.0.1"; + const uint16_t port = kBHCenterPort; boost::asio::io_context io; tcp::endpoint dest(ip::address::from_string(connect_addr), port); - for (int i = 0; i < 10; ++i) { - TcpRequest1::Create(io, dest, "client->server " + std::to_string(i)); + MsgRequestTopic req; + req.set_topic("#center_query_procs"); + req.set_data(""); + auto head = InitMsgHead(GetType(req), "#test_proc", 1000000); + auto route = head.add_route(); + route->set_mq_id(12345); + route->set_abs_addr(67890); + + head.mutable_dest()->set_ip(connect_addr); + head.mutable_dest()->set_port(port); + head.mutable_dest()->set_mq_id(1000011); + head.mutable_dest()->set_abs_addr(10296); + + auto request(MsgI::Serialize(head, req)); + for (int i = 0; i < 1; ++i) { + LOG_DEBUG() << "request size: " << request.size(); + TcpRequest1::Create(io, dest, request, DefaultSender(BHomeShm())); } io.run(); -- Gitblit v1.8.0