From 9243710ca372de26823c2225c7b46b072458c671 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 28 五月 2021 17:18:33 +0800 Subject: [PATCH] tcp proxy requests, need more test. --- box/tcp_connection.cpp | 91 ++++------ box/center_topic_node.cpp | 2 box/center.cpp | 50 ++++ box/io_service.h | 10 box/tcp_proxy.cpp | 56 +----- box/center.h | 5 box/node_center.cpp | 57 ++++- src/topic_node.cpp | 54 +++-- src/msg.cpp | 3 box/io_service.cpp | 33 -- src/shm_socket.cpp | 3 utest/api_test.cpp | 17 + box/tcp_common.h | 8 box/tcp_proxy.h | 24 +- box/tcp_server.h | 10 utest/tcp_test.cpp | 61 ++++-- box/node_center.h | 13 + box/tcp_server.cpp | 23 - box/tcp_connection.h | 16 + 19 files changed, 294 insertions(+), 242 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 8d24315..0fdfa33 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -17,7 +17,9 @@ */ #include "center.h" #include "center_topic_node.h" +#include "io_service.h" #include "node_center.h" +#include "tcp_proxy.h" #include "tcp_server.h" #include <chrono> @@ -74,7 +76,7 @@ }; } -bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm) +bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy) { // command auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { @@ -92,9 +94,41 @@ center->OnTimer(); }; - auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); + + if (!head.dest().ip().empty()) { // other host, proxy + auto valid = [&]() { return head.route_size() == 1; }; + if (!valid()) { return false; } + + if (head.type() == kMsgTypeRequestTopic) { + typedef MsgRequestTopicReply Reply; + Reply reply; + if (!center->CheckMsg(head, reply)) { + replyer(reply); + } else { + auto onResult = [¢er](BHMsgHead &head, std::string body_content) { + if (head.route_size() > 0) { + auto &back = head.route(head.route_size() - 1); + MQInfo dest = {back.mq_id(), back.abs_addr()}; + head.mutable_route()->RemoveLast(); + center->PassRemoteReplyToLocal(dest, head, std::move(body_content)); + } + }; + if (!tcp_proxy.Request(head.dest().ip(), head.dest().port(), msg.content(), onResult)) { + replyer(MakeReply<Reply>(eError, "send request failed.")); + } else { + // success + } + } + return true; + } else { + // ignore other msgs for now. + } + return false; + } + switch (head.type()) { CASE_ON_MSG_TYPE(ProcInit); CASE_ON_MSG_TYPE(Register); @@ -168,7 +202,10 @@ { auto nsec = NodeTimeoutSec(); auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. - AddCenter(center_ptr, shm); + io_service_.reset(new IoService); + tcp_proxy_.reset(new TcpProxy(io_service_->io())); + + AddCenter(center_ptr, shm, *tcp_proxy_); for (auto &kv : Centers()) { auto &info = kv.second; @@ -176,7 +213,7 @@ } topic_node_.reset(new CenterTopicNode(center_ptr, shm)); - tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr)); + tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr)); } BHCenter::~BHCenter() { Stop(); } @@ -188,13 +225,14 @@ sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); } topic_node_->Start(); - tcp_server_->Start(); return true; } bool BHCenter::Stop() { - tcp_server_->Stop(); + tcp_proxy_.reset(); + tcp_server_.reset(); + io_service_.reset(); topic_node_->Stop(); for (auto &kv : sockets_) { kv.second->Stop(); diff --git a/box/center.h b/box/center.h index c4aa1ac..8850db4 100644 --- a/box/center.h +++ b/box/center.h @@ -24,6 +24,8 @@ #include <memory> class CenterTopicNode; class TcpServer; +class TcpProxy; +class IoService; class BHCenter { @@ -54,7 +56,10 @@ std::map<std::string, std::shared_ptr<ShmSocket>> sockets_; std::unique_ptr<CenterTopicNode> topic_node_; + + std::unique_ptr<IoService> io_service_; std::unique_ptr<TcpServer> tcp_server_; + std::unique_ptr<TcpProxy> tcp_proxy_; }; #endif // end of include guard: CENTER_TM9OUQTG diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp index 5c8df7a..749f4e6 100644 --- a/box/center_topic_node.cpp +++ b/box/center_topic_node.cpp @@ -106,7 +106,7 @@ *reply.mutable_errmsg() = data.errmsg(); reply.set_data(ToJson(data)); } else { - SetError(*reply.mutable_errmsg(), eInvalidInput, "not supported topic" + request.topic()); + SetError(*reply.mutable_errmsg(), eInvalidInput, "invalid topic: " + request.topic()); } pnode_->ServerSendReply(src_info, reply); }; diff --git a/box/io_service.cpp b/box/io_service.cpp index 1d531e0..5640d50 100644 --- a/box/io_service.cpp +++ b/box/io_service.cpp @@ -16,33 +16,16 @@ * ===================================================================================== */ #include "io_service.h" -#include <chrono> -using namespace std::chrono_literals; -bool IoService::Start() +IoService::IoService() : + guard_(io_.get_executor()) { - Stop(); - bool cur = false; - if (!run_.compare_exchange_strong(cur, true)) { - return false; - } - - auto proc = [this]() { - while (run_) { - io_.run_one_for(100ms); - } - OnStop(); - }; - std::thread(proc).swap(worker_); - return true; + std::thread([this]() { io_.run(); }).swap(worker_); } - -void IoService::Stop() +IoService::~IoService() { - bool cur = true; - if (run_.compare_exchange_strong(cur, false)) { - if (worker_.joinable()) { - worker_.join(); - } - } + guard_.reset(); + io_.stop(); // normally not needed, but make sure run() exits. + if (worker_.joinable()) + worker_.join(); } diff --git a/box/io_service.h b/box/io_service.h index 000facc..f492e71 100644 --- a/box/io_service.h +++ b/box/io_service.h @@ -24,19 +24,17 @@ class IoService { public: - IoService() : - run_(false) {} - bool Start(); - void Stop(); + IoService(); + ~IoService(); typedef boost::asio::io_context io_service_t; io_service_t &io() { return io_; } private: - virtual void OnStop() {} io_service_t io_; + typedef boost::asio::executor_work_guard<io_service_t::executor_type> guard_t; + guard_t guard_; std::thread worker_; - std::atomic<bool> run_; }; #endif // end of include guard: IO_SERVICE_ODKKJG3D diff --git a/box/node_center.cpp b/box/node_center.cpp index ff199b2..662b2c0 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -70,8 +70,9 @@ return; } // LOG_FUNCTION; + const size_t total = msgs_.size(); time_to_clean_ = now + 1; - int64_t limit = std::max(10000ul, msgs_.size() / 10); + int64_t limit = std::max(10000ul, total / 10); int64_t n = 0; auto it = msgs_.begin(); while (it != msgs_.end() && --limit > 0) { @@ -82,16 +83,16 @@ ++n; }; int n = now - msg.timestamp(); - if (n < 10) { + if (msg.Count() == 0) { + Free(); + } else if (n > NodeTimeoutSec()) { + Free(); + } else { ++it; - } else if (msg.Count() == 0) { - Free(); - } else if (n > 60) { - Free(); } } if (n > 0) { - LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; + LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total; } } @@ -209,17 +210,25 @@ RecordMsg(msg); return socket.Send(dest, msg); } -bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) + +NodeCenter::Node NodeCenter::GetNode(const MQId mq_id) { - auto ssn = dest.id_ - (dest.id_ % 10); - LOG_DEBUG() << "prox ssn " << ssn; + Node node; + auto ssn = mq_id - (mq_id % 10); auto pos = nodes_.find(ssn); - if (pos == nodes_.end()) { - LOG_ERROR() << "proxy msg, ssn not found."; + if (pos != nodes_.end()) { + node = pos->second; + } + return node; +} + +bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) +{ + Node node(GetNode(dest.id_)); + if (!node || !Valid(*node)) { + LOG_ERROR() << id() << " pass remote request, dest not found."; return false; } - auto &node = pos->second; - if (!Valid(*node)) { return false; } ShmSocket &sender(DefaultSender(node->shm_)); auto route = head.add_route(); @@ -233,6 +242,26 @@ return sender.Send(dest, msg, head.msg_id(), std::move(cb)); } +bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) +{ + Node node(GetNode(dest.id_)); + if (!node) { + LOG_ERROR() << id() << " pass remote reply , ssn not found."; + return false; + } + auto offset = node->addrs_[dest.id_]; + if (offset != dest.offset_) { + LOG_ERROR() << id() << " pass remote reply, dest address not match"; + return false; + } + + ShmMsg msg(node->shm_); + if (!msg.Make(head, body_content)) { return false; } + DEFER1(msg.Release();); + RecordMsg(msg); + return DefaultSender(node->shm_).Send(dest, msg); +} + void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) { // LOG_FUNCTION; diff --git a/box/node_center.h b/box/node_center.h index 1c79809..8bc2cf8 100644 --- a/box/node_center.h +++ b/box/node_center.h @@ -122,7 +122,8 @@ void RecordMsg(const MsgI &msg); bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); - bool ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); + bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); + bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content); void OnAlloc(ShmSocket &socket, const int64_t val); void OnFree(ShmSocket &socket, const int64_t val); bool OnCommand(ShmSocket &socket, const int64_t val); @@ -159,6 +160,14 @@ { return HandleMsg<MsgCommonReply, Func>(head, op); } + template <class Reply> + bool CheckMsg(const BHMsgHead &head, Reply &reply) + { + bool r = false; + auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); }; + reply = HandleMsg<Reply>(head, onOk); + return r; + } MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg); MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg); @@ -184,6 +193,8 @@ return node && Valid(*node); } void RemoveNode(Node &node); + Node GetNode(const MQId mq); + std::string id_; // center proc id; std::unordered_map<Topic, Clients> service_map_; diff --git a/box/tcp_common.h b/box/tcp_common.h index 8c8b7ec..3d2b133 100644 --- a/box/tcp_common.h +++ b/box/tcp_common.h @@ -21,10 +21,18 @@ #include <boost/asio.hpp> #include <boost/uuid/string_generator.hpp> #include <boost/uuid/uuid.hpp> +#include <functional> +#include <string> + namespace ip = boost::asio::ip; using boost::asio::ip::tcp; typedef boost::system::error_code bserror_t; const boost::uuids::uuid kBHTcpServerTag = boost::uuids::string_generator()("e5bff527-6bf8-ee0d-cd28-b36594acee39"); +namespace bhome_msg +{ +class BHMsgHead; +} +typedef std::function<void(bhome_msg::BHMsgHead &head, std::string body_content)> ReplyCB; #endif // end of include guard: TCP_COMMON_8S8O7OV diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp index 22162e5..02001bb 100644 --- a/box/tcp_connection.cpp +++ b/box/tcp_connection.cpp @@ -19,6 +19,7 @@ #include "log.h" #include "msg.h" #include "node_center.h" +#include "proto.h" #include "shm_socket.h" namespace @@ -59,10 +60,8 @@ bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content) { const char *p = buffer.data(); - LOG_DEBUG() << "msg len " << len; if (4 > len) { return false; } uint32_t head_len = Get32(p); - LOG_DEBUG() << "head_len " << head_len; if (head_len > 1024 * 4) { throw std::runtime_error("unexpected tcp reply data."); } @@ -87,15 +86,34 @@ /// request ----------------------------------------------------------- +void TcpRequest1::SendReply(BHMsgHead &head, std::string body_content) +{ + if (reply_cb_) { + reply_cb_(head, std::move(body_content)); + } +} + void TcpRequest1::OnError(bserror_t ec) { - LOG_ERROR() << "tcp client error: " << ec; + // LOG_ERROR() << "tcp client error: " << ec << ", " << ec.message(); + BHMsgHead head; + std::string body_content; + try { + std::vector<char> req(request_.begin(), request_.end()); + if (CheckData(req, req.size(), head, body_content)) { + if (head.type() == kMsgTypeRequestTopic) { + SendReply(head, MakeReply<MsgRequestTopicReply>(eError, std::to_string(ec.value()) + ',' + ec.message()).SerializeAsString()); + } + } + } catch (std::exception &e) { + } Close(); } void TcpRequest1::Start() { auto readReply = [this]() { + // if (!reply_cb_) { return; } // no reply needed, maybe safe to close? recv_buffer_.resize(1000); recv_len_ = 0; socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); @@ -104,47 +122,22 @@ socket_.async_connect(remote_, TcpCB(*this, request)); } -void TcpRequest1::Close() -{ - LOG_DEBUG() << "client close"; - socket_.close(); -} +void TcpRequest1::Close() { socket_.close(); } void TcpRequest1::OnRead(size_t size) { - LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_; - recv_len_ += size; BHMsgHead head; std::string body_content; - bool recv_done = false; try { - recv_done = CheckData(recv_buffer_, recv_len_, head, body_content); + if (CheckData(recv_buffer_, recv_len_, head, body_content)) { // got reply. + Close(); + SendReply(head, std::move(body_content)); + } else { // not complete, read again + socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); + } } catch (std::exception &e) { LOG_ERROR() << e.what(); Close(); - return; - } - - if (recv_done) { - // just pass to client, no check, client will check it anyway. - LOG_DEBUG() << "route size: " << head.route_size(); - if (head.route_size() < 1) { return; } - auto &back = head.route(head.route_size() - 1); - MQInfo dest = {back.mq_id(), back.abs_addr()}; - head.mutable_route()->RemoveLast(); - - LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_; - MsgRequestTopicReply reply; - if (reply.ParseFromString(body_content)) { - LOG_DEBUG() << "err msg: " << reply.errmsg().errstring(); - LOG_DEBUG() << "content : " << reply.data(); - } - Close(); - return; - shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size())); - } else { // read again - LOG_DEBUG() << "not complete, read again " << recv_buffer_.size(); - socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); } } @@ -153,7 +146,7 @@ void TcpReply1::OnError(bserror_t ec) { Close(); } void TcpReply1::Close() { - LOG_DEBUG() << "server close."; + LOG_TRACE() << "server close."; socket_.close(); } @@ -177,38 +170,26 @@ return; } - auto ParseBody = [&](auto &req) { - const char *p = recv_buffer_.data(); - uint32_t size = Get32(p); - p += 4; - p += size; - size = Get32(p); - p += 4; - return req.ParseFromArray(p, size); - }; - if (recv_done) { - LOG_DEBUG() << "request data: " << size; - auto self(shared_from_this()); + LOG_TRACE() << "tcp server recv request data, size: " << size; MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()}; if (remote.id_ && remote.offset_) { + auto self(shared_from_this()); auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { send_buffer_ = imsg.content(); async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); }; auto &scenter = *pscenter_; - if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) { - send_buffer_ = "fake reply"; - async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); + if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { + return; } } else { LOG_DEBUG() << "no address"; - send_buffer_ = "no address"; - async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); } + Close(); - } else { // read again - LOG_DEBUG() << "not complete, read again " << recv_buffer_.size(); + } else { // not complete, read again + LOG_TRACE() << "not complete, read again " << recv_buffer_.size(); socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); } }; \ No newline at end of file diff --git a/box/tcp_connection.h b/box/tcp_connection.h index 5aa93a4..6e1d0ad 100644 --- a/box/tcp_connection.h +++ b/box/tcp_connection.h @@ -25,25 +25,28 @@ #include <memory> class ShmSocket; +class NodeCenter; +typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; + class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1> { public: - static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) + static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ReplyCB const &cb) { - std::make_shared<TcpRequest1>(io, addr, std::move(request), shm_socket)->Start(); + std::make_shared<TcpRequest1>(io, addr, std::move(request), cb)->Start(); } - - TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) : - socket_(io), shm_socket_(shm_socket), remote_(addr), request_(std::move(request)) {} + TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ReplyCB const &cb) : + socket_(io), reply_cb_(cb), remote_(addr), request_(std::move(request)) {} void OnError(bserror_t ec); private: void Start(); void Close(); void OnRead(size_t size); + void SendReply(BHMsgHead &head, std::string body_content); tcp::socket socket_; - ShmSocket &shm_socket_; // send reply + ReplyCB reply_cb_; tcp::endpoint remote_; std::string request_; std::vector<char> recv_buffer_; @@ -54,7 +57,6 @@ class TcpReply1 : public std::enable_shared_from_this<TcpReply1> { public: - typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; static void Create(tcp::socket sock, CenterPtr pscenter) { std::make_shared<TcpReply1>(std::move(sock), pscenter)->Start(); diff --git a/box/tcp_proxy.cpp b/box/tcp_proxy.cpp index 298c6b6..b4ec497 100644 --- a/box/tcp_proxy.cpp +++ b/box/tcp_proxy.cpp @@ -16,54 +16,18 @@ * ===================================================================================== */ #include "tcp_proxy.h" -#include "defs.h" -#include "shm_socket.h" #include "tcp_connection.h" -TcpProxy::TcpProxy() : - run_(false) {} - -TcpProxy::~TcpProxy() {} - -bool TcpProxy::Start(bhome_shm::SharedMemory &shm) -{ - Stop(); - bool cur = false; - if (!run_.compare_exchange_strong(cur, true)) { return false; } - - auto &mq = GetCenterInfo(shm)->mq_tcp_proxy_; - local_.reset(new ShmSocket(mq.offset_, shm, mq.id_)); - auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { - auto &dest = head.dest(); - if (dest.ip().empty() || dest.port() == 0) { return; } - Request(dest.ip(), dest.port(), msg.content()); - }; - local_->Start(1, localProc); - - auto proxyProc = [this]() { - while (run_) { - io_context_.run_one_for(std::chrono::milliseconds(100)); - } - }; - std::thread(proxyProc).swap(worker_); - return true; -} - -void TcpProxy::Stop() -{ - bool cur = true; - if (run_.compare_exchange_strong(cur, false)) { - if (worker_.joinable()) { - worker_.join(); - } - local_.reset(); - } -} - -bool TcpProxy::Request(const std::string &ip, int port, std::string &&content) +bool TcpProxy::Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb) { if (content.empty()) { return false; } - - tcp::endpoint dest(ip::address::from_string(ip), port); - TcpRequest1::Create(io_context_, dest, std::move(content), *local_); + try { + tcp::endpoint dest(ip::address::from_string(ip), port); + TcpRequest1::Create(io_, dest, std::move(content), cb); + LOG_TRACE() << "tcp request start " << ip << ':' << port; + return true; + } catch (std::exception &e) { + LOG_ERROR() << "proxy request exception: " << e.what(); + return false; + } } diff --git a/box/tcp_proxy.h b/box/tcp_proxy.h index 9c74532..69c3f03 100644 --- a/box/tcp_proxy.h +++ b/box/tcp_proxy.h @@ -18,28 +18,24 @@ #ifndef TCP_PROXY_E1YJ92U5 #define TCP_PROXY_E1YJ92U5 -#include "shm.h" +#include "bh_util.h" +#include "io_service.h" #include "tcp_common.h" -#include <atomic> -#include <thread> +#include <memory> -class ShmSocket; +class NodeCenter; +typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; class TcpProxy { public: - TcpProxy(); - ~TcpProxy(); - bool Start(bhome_shm::SharedMemory &shm); - void Stop(); + typedef IoService::io_service_t io_service_t; + TcpProxy(io_service_t &io) : + io_(io) {} + bool Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb); private: - bool Request(const std::string &ip, int port, std::string &&content); - std::unique_ptr<ShmSocket> local_; - - boost::asio::io_context io_context_; - std::thread worker_; - std::atomic<bool> run_; + io_service_t &io_; }; #endif // end of include guard: TCP_PROXY_E1YJ92U5 diff --git a/box/tcp_server.cpp b/box/tcp_server.cpp index ea23106..5cd8743 100644 --- a/box/tcp_server.cpp +++ b/box/tcp_server.cpp @@ -23,26 +23,19 @@ using namespace std::chrono_literals; -TcpServer::TcpServer(int port, CenterPtr pscenter) : - listener_(io(), tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter) -{ - Accept(); -} - -TcpServer::~TcpServer() { Stop(); } - -void TcpServer::OnStop() -{ - listener_.close(); -} - void TcpServer::Accept() { listener_.async_accept([this](bserror_t ec, tcp::socket sock) { if (!ec) { - LOG_INFO() << "server accept client"; + LOG_TRACE() << "server accept client"; TcpReply1::Create(std::move(sock), pscenter_); + Accept(); + } else { + // this is already destructed by now. + if (ec.value() != ECANCELED) { + LOG_WARNING() << "tcp server accept error: " << ec; + Accept(); + } } - Accept(); }); } \ No newline at end of file diff --git a/box/tcp_server.h b/box/tcp_server.h index 2c9337c..4698196 100644 --- a/box/tcp_server.h +++ b/box/tcp_server.h @@ -23,15 +23,17 @@ #include "tcp_common.h" class NodeCenter; -class TcpServer : public IoService +class TcpServer { public: + typedef IoService::io_service_t io_service_t; typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; - TcpServer(int port, CenterPtr pscenter); - ~TcpServer(); + TcpServer(io_service_t &io, int port, CenterPtr pscenter) : + io_(io), listener_(io_, tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter) { Accept(); } + ~TcpServer() { listener_.close(); } private: - virtual void OnStop(); + io_service_t &io_; void Accept(); tcp::acceptor listener_; CenterPtr pscenter_; diff --git a/src/msg.cpp b/src/msg.cpp index 40a7b0d..3546424 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -37,7 +37,8 @@ Free(); } } else if (n < 0) { - LOG_FATAL() << "error double release data."; + // ns_log::GetTrace(); + LOG_FATAL() << "double release msg."; throw std::runtime_error("double release msg."); } return n; diff --git a/src/shm_socket.cpp b/src/shm_socket.cpp index d54168d..2bdccc2 100644 --- a/src/shm_socket.cpp +++ b/src/shm_socket.cpp @@ -171,8 +171,7 @@ RecvCB cb_no_use; per_msg_cbs_->Pick(msg_id, cb_no_use); }; - SendImpl(remote, msg, onExpireRemoveCB); - return true; + return SendImpl(remote, msg, onExpireRemoveCB); } catch (std::exception &e) { SetLastError(eError, "Send internal error."); return false; diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 3c38121..b21f7ef 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -497,9 +497,10 @@ out_msg_id = msg_id; - auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) { + auto SendTo = [this, remote_addr, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) { auto &sock = SockClient(); BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id)); + *head.mutable_dest() = remote_addr; AddRoute(head, sock); head.set_topic(req.topic()); @@ -519,8 +520,12 @@ }; try { - BHAddress addr; - return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb); + if (remote_addr.ip().empty()) { + BHAddress addr; + return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb); + } else { + return SendTo(CenterAddr(), req, cb); + } } catch (...) { SetLastError(eError, "internal error."); return false; @@ -536,25 +541,34 @@ try { auto &sock = SockClient(); - - BHAddress addr; - if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { - LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id(); - BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); - AddRoute(head, sock); - head.set_topic(request.topic()); - - MsgI reply_msg(shm()); - DEFER1(reply_msg.Release();); - BHMsgHead reply_head; - - if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) && - reply_head.type() == kMsgTypeRequestTopicReply && - reply_msg.ParseBody(out_reply)) { - reply_head.mutable_proc_id()->swap(out_proc_id); - return true; + MQInfo dest; + if (!remote_addr.ip().empty()) { + dest = CenterAddr(); + } else { + BHAddress addr; + if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { + dest.offset_ = addr.abs_addr(); + dest.id_ = addr.mq_id(); + } else { + return false; } } + + BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); + *head.mutable_dest() = remote_addr; + AddRoute(head, sock); + head.set_topic(request.topic()); + + MsgI reply_msg(shm()); + DEFER1(reply_msg.Release();); + BHMsgHead reply_head; + + if (sock.SendAndRecv(dest, head, request, reply_msg, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeRequestTopicReply && + reply_msg.ParseBody(out_reply)) { + reply_head.mutable_proc_id()->swap(out_proc_id); + return true; + } } catch (...) { SetLastError(eError, __func__ + std::string(" internal errer.")); } diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 3d842bf..bddcbf7 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -206,7 +206,7 @@ } printf("\n"); }; - { + if (0) { // query procs std::string dest(BHAddress().SerializeAsString()); MsgQueryProc query; @@ -224,14 +224,21 @@ // printf("register topic : %s\n", r ? "ok" : "failed"); // Sleep(1s); } - { + for (int i = 0; i < 3; ++i) { // query procs with normal topic request MsgRequestTopic req; req.set_topic("#center_query_procs"); // req.set_data("{\"proc_id\":\"#center.node\"}"); std::string s(req.SerializeAsString()); // Sleep(10ms, false); - std::string dest(BHAddress().SerializeAsString()); + BHAddress host; + printf("query with ip set\n"); + host.set_ip("127.0.0.1"); + host.set_port(kBHCenterPort); + host.set_mq_id(1000011); + host.set_abs_addr(10296); + + std::string dest(host.SerializeAsString()); void *proc_id = 0; int proc_id_len = 0; DEFER1(BHFree(proc_id, proc_id_len);); @@ -247,7 +254,7 @@ } else { MsgRequestTopicReply ret; ret.ParseFromArray(reply, reply_len); - printf("topic query proc : %s\n", ret.data().c_str()); + printf("\ntopic query proc : %s\n", ret.data().c_str()); // MsgQueryProcReply result; // if (result.ParseFromArray(ret.data().data(), ret.data().size()) && IsSuccess(result.errmsg().errcode())) { // PrintProcs(result); @@ -325,7 +332,7 @@ for (int i = 0; i < 1; ++i) { MsgPublish pub; pub.set_topic(topic_ + std::to_string(i)); - pub.set_data("pub_data_" + std::string(1024 * 1, 'a')); + pub.set_data("pub_data_" + std::string(104 * 1, 'a')); std::string s(pub.SerializeAsString()); BHPublish(s.data(), s.size(), 0); // Sleep(1s); diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp index a838252..86a0897 100644 --- a/utest/tcp_test.cpp +++ b/utest/tcp_test.cpp @@ -19,6 +19,7 @@ #include "defs.h" #include "node_center.h" #include "tcp_connection.h" +#include "tcp_proxy.h" #include "tcp_server.h" #include "util.h" #include <sys/ioctl.h> @@ -33,34 +34,54 @@ BOOST_AUTO_TEST_CASE(TcpTest) { - SharedMemory &shm = TestShm(); - const std::string connect_addr = "127.0.0.1"; const uint16_t port = kBHCenterPort; - boost::asio::io_context io; + IoService io; tcp::endpoint dest(ip::address::from_string(connect_addr), port); - MsgRequestTopic req; - req.set_topic("#center_query_procs"); - req.set_data(""); - auto head = InitMsgHead(GetType(req), "#test_proc", 1000000); - auto route = head.add_route(); - route->set_mq_id(12345); - route->set_abs_addr(67890); - head.mutable_dest()->set_ip(connect_addr); - head.mutable_dest()->set_port(port); - head.mutable_dest()->set_mq_id(1000011); - head.mutable_dest()->set_abs_addr(10296); + auto NewRequest = [&]() { + MsgRequestTopic req; + req.set_topic("#center_query_procs"); + req.set_data(""); + auto head = InitMsgHead(GetType(req), "#test_proc", 1000000); + auto route = head.add_route(); + route->set_mq_id(12345); + route->set_abs_addr(67890); - auto request(MsgI::Serialize(head, req)); - for (int i = 0; i < 1; ++i) { - LOG_DEBUG() << "request size: " << request.size(); - TcpRequest1::Create(io, dest, request, DefaultSender(BHomeShm())); + head.mutable_dest()->set_ip(connect_addr); + head.mutable_dest()->set_port(port); + head.mutable_dest()->set_mq_id(1000011); + head.mutable_dest()->set_abs_addr(10296); + + return (MsgI::Serialize(head, req)); + }; + auto onReply = [](BHMsgHead &head, std::string body_content) { + static int n = 0; + printf("reply %d: ", ++n); + MsgRequestTopicReply reply; + if (reply.ParseFromString(body_content)) { + if (IsSuccess(reply.errmsg().errcode())) { + printf("\ncontent: %s\n", reply.data().c_str()); + } else { + printf("error: %s\n", reply.errmsg().errstring().c_str()); + } + } else { + printf("parse error\n"); + } + }; + for (int i = 0; i < 100; ++i) { + auto request = NewRequest(); + TcpRequest1::Create(io.io(), dest, request, onReply); } - - io.run(); + Sleep(2s); + printf("-------------------------------------------------------\n"); + for (int i = 0; i < 3; ++i) { + auto request = NewRequest(); + TcpRequest1::Create(io.io(), dest, request, onReply); + } + Sleep(2s); printf("TcpTest\n"); } -- Gitblit v1.8.0