From 9243710ca372de26823c2225c7b46b072458c671 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 28 五月 2021 17:18:33 +0800 Subject: [PATCH] tcp proxy requests, need more test. --- box/center.cpp | 50 ++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 44 insertions(+), 6 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 8d24315..0fdfa33 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -17,7 +17,9 @@ */ #include "center.h" #include "center_topic_node.h" +#include "io_service.h" #include "node_center.h" +#include "tcp_proxy.h" #include "tcp_server.h" #include <chrono> @@ -74,7 +76,7 @@ }; } -bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm) +bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy) { // command auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { @@ -92,9 +94,41 @@ center->OnTimer(); }; - auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); + + if (!head.dest().ip().empty()) { // other host, proxy + auto valid = [&]() { return head.route_size() == 1; }; + if (!valid()) { return false; } + + if (head.type() == kMsgTypeRequestTopic) { + typedef MsgRequestTopicReply Reply; + Reply reply; + if (!center->CheckMsg(head, reply)) { + replyer(reply); + } else { + auto onResult = [¢er](BHMsgHead &head, std::string body_content) { + if (head.route_size() > 0) { + auto &back = head.route(head.route_size() - 1); + MQInfo dest = {back.mq_id(), back.abs_addr()}; + head.mutable_route()->RemoveLast(); + center->PassRemoteReplyToLocal(dest, head, std::move(body_content)); + } + }; + if (!tcp_proxy.Request(head.dest().ip(), head.dest().port(), msg.content(), onResult)) { + replyer(MakeReply<Reply>(eError, "send request failed.")); + } else { + // success + } + } + return true; + } else { + // ignore other msgs for now. + } + return false; + } + switch (head.type()) { CASE_ON_MSG_TYPE(ProcInit); CASE_ON_MSG_TYPE(Register); @@ -168,7 +202,10 @@ { auto nsec = NodeTimeoutSec(); auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. - AddCenter(center_ptr, shm); + io_service_.reset(new IoService); + tcp_proxy_.reset(new TcpProxy(io_service_->io())); + + AddCenter(center_ptr, shm, *tcp_proxy_); for (auto &kv : Centers()) { auto &info = kv.second; @@ -176,7 +213,7 @@ } topic_node_.reset(new CenterTopicNode(center_ptr, shm)); - tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr)); + tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr)); } BHCenter::~BHCenter() { Stop(); } @@ -188,13 +225,14 @@ sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); } topic_node_->Start(); - tcp_server_->Start(); return true; } bool BHCenter::Stop() { - tcp_server_->Stop(); + tcp_proxy_.reset(); + tcp_server_.reset(); + io_service_.reset(); topic_node_->Stop(); for (auto &kv : sockets_) { kv.second->Stop(); -- Gitblit v1.8.0