From 993c556000a414011626770540678948f16eaa9e Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 02 六月 2021 17:40:50 +0800 Subject: [PATCH] center restart with new shm; set center node ssn. --- box/center.cpp | 99 +++++++++++++++++++++++++++++++------------------ 1 files changed, 63 insertions(+), 36 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 0f36719..020d1bf 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -16,7 +16,11 @@ * ===================================================================================== */ #include "center.h" +#include "center_topic_node.h" +#include "io_service.h" #include "node_center.h" +#include "tcp_proxy.h" +#include "tcp_server.h" #include <chrono> using namespace std::chrono; @@ -41,18 +45,6 @@ } } -Handler Combine(const Handler &h1, const Handler &h2) -{ - return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) { - return h1(socket, msg, head) || h2(socket, msg, head); - }; -} -template <class... H> -Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest) -{ - return Combine(Combine(h0, h1), h2, rest...); -} - #define CASE_ON_MSG_TYPE(MsgTag) \ case kMsgType##MsgTag: \ Dispatch<Msg##MsgTag>( \ @@ -64,7 +56,7 @@ return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; - MsgI msg; + MsgI msg(socket.shm()); if (msg.Make(reply_head, rep_body)) { DEFER1(msg.Release();); center->SendAllocMsg(socket, remote, msg); @@ -72,7 +64,7 @@ }; } -bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) +bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy) { // command auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { @@ -86,13 +78,49 @@ auto onInit = [&](const int64_t request) { return center->OnNodeInit(socket, request); }; - BHCenterHandleInit(onInit); + BHCenterHandleInit(socket.shm(), onInit); center->OnTimer(); }; - auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); + + if (!head.dest().ip().empty()) { // other host, proxy + auto valid = [&]() { return head.route_size() == 1; }; + if (!valid()) { return false; } + + if (head.type() == kMsgTypeRequestTopic) { + typedef MsgRequestTopicReply Reply; + Reply reply; + if (!center->CheckMsg(head, reply)) { + replyer(reply); + } else { + auto onResult = [¢er](BHMsgHead &head, std::string body_content) { + if (head.route_size() > 0) { + auto &back = head.route(head.route_size() - 1); + MQInfo dest = {back.mq_id(), back.abs_addr()}; + head.mutable_route()->RemoveLast(); + center->PassRemoteReplyToLocal(dest, head, std::move(body_content)); + } + }; + uint16_t port = head.dest().port(); + if (port == 0) { + port = kBHCenterPort; + } + if (!tcp_proxy.Request(head.dest().ip(), port, msg.content(), onResult)) { + replyer(MakeReply<Reply>(eError, "send request failed.")); + } else { + // success + } + } + return true; + } else { + // ignore other msgs for now. + } + return false; + } + switch (head.type()) { CASE_ON_MSG_TYPE(ProcInit); CASE_ON_MSG_TYPE(Register); @@ -105,7 +133,7 @@ default: return false; } }; - BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); + BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; @@ -123,20 +151,14 @@ } else { replyer(MakeReply(eSuccess)); if (clients.empty()) { return; } - - auto it = clients.begin(); - do { - auto &cli = *it; + for (auto &cli : clients) { auto node = cli.weak_node_.lock(); if (node) { // should also make sure that mq is not killed before msg expires. // it would be ok if (kill_time - offline_time) is longer than expire time. socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); - ++it; - } else { - it = clients.erase(it); } - } while (it != clients.end()); + } } }; switch (head.type()) { @@ -147,7 +169,7 @@ } }; - BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); + BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); return true; } @@ -170,22 +192,23 @@ BHCenter::BHCenter(Socket::Shm &shm) { - auto gc = [&](const MQId id) { - auto r = ShmSocket::Remove(shm, id); - if (r) { - LOG_DEBUG() << "remove mq " << id << " ok\n"; - } - }; - auto nsec = NodeTimeoutSec(); - auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. - AddCenter(center_ptr); + auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. + io_service_.reset(new IoService); + tcp_proxy_.reset(new TcpProxy(io_service_->io())); + + AddCenter(center_ptr, shm, *tcp_proxy_); for (auto &kv : Centers()) { auto &info = kv.second; sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_); } + + topic_node_.reset(new CenterTopicNode(center_ptr, shm)); + tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr)); } + +BHCenter::~BHCenter() { Stop(); } bool BHCenter::Start() { @@ -193,12 +216,16 @@ auto &info = kv.second; sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); } - + topic_node_->Start(); return true; } bool BHCenter::Stop() { + tcp_proxy_.reset(); + tcp_server_.reset(); + io_service_.reset(); + topic_node_->Stop(); for (auto &kv : sockets_) { kv.second->Stop(); } -- Gitblit v1.8.0