From 72851db66655912cb9c92300a80985fb9797d168 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 01 六月 2021 16:25:23 +0800 Subject: [PATCH] remove AtomicQueue, not used. --- box/center.cpp | 76 ++++++++++++++++++++++++++----------- 1 files changed, 53 insertions(+), 23 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index e745be8..0fdfa33 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -17,7 +17,10 @@ */ #include "center.h" #include "center_topic_node.h" +#include "io_service.h" #include "node_center.h" +#include "tcp_proxy.h" +#include "tcp_server.h" #include <chrono> using namespace std::chrono; @@ -65,7 +68,7 @@ return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; - MsgI msg; + MsgI msg(socket.shm()); if (msg.Make(reply_head, rep_body)) { DEFER1(msg.Release();); center->SendAllocMsg(socket, remote, msg); @@ -73,7 +76,7 @@ }; } -bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) +bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy) { // command auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { @@ -87,13 +90,45 @@ auto onInit = [&](const int64_t request) { return center->OnNodeInit(socket, request); }; - BHCenterHandleInit(onInit); + BHCenterHandleInit(socket.shm(), onInit); center->OnTimer(); }; - auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); + + if (!head.dest().ip().empty()) { // other host, proxy + auto valid = [&]() { return head.route_size() == 1; }; + if (!valid()) { return false; } + + if (head.type() == kMsgTypeRequestTopic) { + typedef MsgRequestTopicReply Reply; + Reply reply; + if (!center->CheckMsg(head, reply)) { + replyer(reply); + } else { + auto onResult = [¢er](BHMsgHead &head, std::string body_content) { + if (head.route_size() > 0) { + auto &back = head.route(head.route_size() - 1); + MQInfo dest = {back.mq_id(), back.abs_addr()}; + head.mutable_route()->RemoveLast(); + center->PassRemoteReplyToLocal(dest, head, std::move(body_content)); + } + }; + if (!tcp_proxy.Request(head.dest().ip(), head.dest().port(), msg.content(), onResult)) { + replyer(MakeReply<Reply>(eError, "send request failed.")); + } else { + // success + } + } + return true; + } else { + // ignore other msgs for now. + } + return false; + } + switch (head.type()) { CASE_ON_MSG_TYPE(ProcInit); CASE_ON_MSG_TYPE(Register); @@ -106,7 +141,7 @@ default: return false; } }; - BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); + BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; @@ -124,20 +159,14 @@ } else { replyer(MakeReply(eSuccess)); if (clients.empty()) { return; } - - auto it = clients.begin(); - do { - auto &cli = *it; + for (auto &cli : clients) { auto node = cli.weak_node_.lock(); if (node) { // should also make sure that mq is not killed before msg expires. // it would be ok if (kill_time - offline_time) is longer than expire time. socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); - ++it; - } else { - it = clients.erase(it); } - } while (it != clients.end()); + } } }; switch (head.type()) { @@ -148,7 +177,7 @@ } }; - BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); + BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); return true; } @@ -171,16 +200,12 @@ BHCenter::BHCenter(Socket::Shm &shm) { - auto gc = [&](const MQId id) { - auto r = ShmSocket::Remove(shm, id); - if (r) { - LOG_DEBUG() << "remove mq " << id << " ok\n"; - } - }; - auto nsec = NodeTimeoutSec(); - auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. - AddCenter(center_ptr); + auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. + io_service_.reset(new IoService); + tcp_proxy_.reset(new TcpProxy(io_service_->io())); + + AddCenter(center_ptr, shm, *tcp_proxy_); for (auto &kv : Centers()) { auto &info = kv.second; @@ -188,7 +213,9 @@ } topic_node_.reset(new CenterTopicNode(center_ptr, shm)); + tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr)); } + BHCenter::~BHCenter() { Stop(); } bool BHCenter::Start() @@ -203,6 +230,9 @@ bool BHCenter::Stop() { + tcp_proxy_.reset(); + tcp_server_.reset(); + io_service_.reset(); topic_node_->Stop(); for (auto &kv : sockets_) { kv.second->Stop(); -- Gitblit v1.8.0