From ae17d1439b35b55212c3a30712e0a60b1d6a99c0 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 30 六月 2021 11:15:53 +0800 Subject: [PATCH] support tcp pub/sub. --- box/center.cpp | 107 +++++++++++++++++++++++++++++++---------------------- 1 files changed, 62 insertions(+), 45 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 3f565b1..e0abbb3 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -17,7 +17,10 @@ */ #include "center.h" #include "center_topic_node.h" +#include "io_service.h" #include "node_center.h" +#include "tcp_proxy.h" +#include "tcp_server.h" #include <chrono> using namespace std::chrono; @@ -30,8 +33,6 @@ namespace { -//TODO check proc_id - template <class Body, class OnMsg, class Replyer> inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer) { @@ -40,18 +41,6 @@ if (msg.ParseBody(body)) { replyer(onmsg(body)); } -} - -Handler Combine(const Handler &h1, const Handler &h2) -{ - return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) { - return h1(socket, msg, head) || h2(socket, msg, head); - }; -} -template <class... H> -Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest) -{ - return Combine(Combine(h0, h1), h2, rest...); } #define CASE_ON_MSG_TYPE(MsgTag) \ @@ -65,7 +54,7 @@ return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; - MsgI msg; + MsgI msg(socket.shm()); if (msg.Make(reply_head, rep_body)) { DEFER1(msg.Release();); center->SendAllocMsg(socket, remote, msg); @@ -73,7 +62,7 @@ }; } -bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) +bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy) { // command auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { @@ -87,13 +76,49 @@ auto onInit = [&](const int64_t request) { return center->OnNodeInit(socket, request); }; - BHCenterHandleInit(onInit); + BHCenterHandleInit(socket.shm(), onInit); center->OnTimer(); }; - auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); + + if (!head.dest().ip().empty()) { // other host, proxy + auto valid = [&]() { return head.route_size() == 1; }; + if (!valid()) { return false; } + + if (head.type() == kMsgTypeRequestTopic) { + typedef MsgRequestTopicReply Reply; + Reply reply; + if (!center->CheckMsg(head, reply)) { + replyer(reply); + } else { + auto onResult = [¢er](BHMsgHead &head, std::string body_content) { + if (head.route_size() > 0) { + auto &back = head.route(head.route_size() - 1); + MQInfo dest = {back.mq_id(), back.abs_addr()}; + head.mutable_route()->RemoveLast(); + center->PassRemoteReplyToLocal(dest, head, std::move(body_content)); + } + }; + uint16_t port = head.dest().port(); + if (port == 0) { + port = kBHCenterPort; + } + if (!tcp_proxy.Request(head.dest().ip(), port, msg.content(), onResult)) { + replyer(MakeReply<Reply>(eError, "send request failed.")); + } else { + // success + } + } + return true; + } else { + // ignore other msgs for now. + } + return false; + } + switch (head.type()) { CASE_ON_MSG_TYPE(ProcInit); CASE_ON_MSG_TYPE(Register); @@ -106,31 +131,22 @@ default: return false; } }; - BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); + BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; - auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); auto OnPublish = [&]() { MsgPublish pub; - NodeCenter::Clients clients; - MsgCommonReply reply; - if (head.route_size() != 1 || !msg.ParseBody(pub)) { - return; - } else if (!center->FindClients(head, pub, clients, reply)) { + if (head.route_size() == 1 && msg.ParseBody(pub)) { + // replyer(center->Publish(head, pub.topic(), msg)); // dead lock? + auto reply(center->Publish(head, pub.topic(), msg)); replyer(reply); - } else { - replyer(MakeReply(eSuccess)); - if (clients.empty()) { return; } - for (auto &cli : clients) { - auto node = cli.weak_node_.lock(); - if (node) { - // should also make sure that mq is not killed before msg expires. - // it would be ok if (kill_time - offline_time) is longer than expire time. - socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); - } + auto hosts = center->FindRemoteSubClients(pub.topic()); + for (auto &host : hosts) { + tcp_proxy.Publish(host, kBHCenterPort, msg.content()); } } }; @@ -142,7 +158,7 @@ } }; - BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); + BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); return true; } @@ -165,16 +181,12 @@ BHCenter::BHCenter(Socket::Shm &shm) { - auto gc = [&](const MQId id) { - auto r = ShmSocket::Remove(shm, id); - if (r) { - LOG_DEBUG() << "remove mq " << id << " ok\n"; - } - }; - auto nsec = NodeTimeoutSec(); - auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. - AddCenter(center_ptr); + auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. + io_service_.reset(new IoService); + tcp_proxy_.reset(new TcpProxy(io_service_->io())); + + AddCenter(center_ptr, shm, *tcp_proxy_); for (auto &kv : Centers()) { auto &info = kv.second; @@ -182,7 +194,9 @@ } topic_node_.reset(new CenterTopicNode(center_ptr, shm)); + tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr)); } + BHCenter::~BHCenter() { Stop(); } bool BHCenter::Start() @@ -197,6 +211,9 @@ bool BHCenter::Stop() { + tcp_proxy_.reset(); + tcp_server_.reset(); + io_service_.reset(); topic_node_->Stop(); for (auto &kv : sockets_) { kv.second->Stop(); -- Gitblit v1.8.0