| | |
| | | */ |
| | | #include "center.h" |
| | | #include "center_topic_node.h" |
| | | #include "io_service.h" |
| | | #include "node_center.h" |
| | | #include "tcp_proxy.h" |
| | | #include "tcp_server.h" |
| | | #include <chrono> |
| | | |
| | | using namespace std::chrono; |
| | |
| | | } |
| | | } |
| | | |
| | | Handler Combine(const Handler &h1, const Handler &h2) |
| | | { |
| | | return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) { |
| | | return h1(socket, msg, head) || h2(socket, msg, head); |
| | | }; |
| | | } |
| | | template <class... H> |
| | | Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest) |
| | | { |
| | | return Combine(Combine(h0, h1), h2, rest...); |
| | | } |
| | | |
| | | #define CASE_ON_MSG_TYPE(MsgTag) \ |
| | | case kMsgType##MsgTag: \ |
| | | Dispatch<Msg##MsgTag>( \ |
| | |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); |
| | | MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; |
| | | MsgI msg; |
| | | MsgI msg(socket.shm()); |
| | | if (msg.Make(reply_head, rep_body)) { |
| | | DEFER1(msg.Release();); |
| | | center->SendAllocMsg(socket, remote, msg); |
| | |
| | | }; |
| | | } |
| | | |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy) |
| | | { |
| | | // command |
| | | auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { |
| | |
| | | auto onInit = [&](const int64_t request) { |
| | | return center->OnNodeInit(socket, request); |
| | | }; |
| | | BHCenterHandleInit(onInit); |
| | | BHCenterHandleInit(socket.shm(), onInit); |
| | | center->OnTimer(); |
| | | }; |
| | | |
| | | auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center); |
| | | |
| | | if (!head.dest().ip().empty()) { // other host, proxy |
| | | auto valid = [&]() { return head.route_size() == 1; }; |
| | | if (!valid()) { return false; } |
| | | |
| | | if (head.type() == kMsgTypeRequestTopic) { |
| | | typedef MsgRequestTopicReply Reply; |
| | | Reply reply; |
| | | if (!center->CheckMsg(head, reply)) { |
| | | replyer(reply); |
| | | } else { |
| | | auto onResult = [¢er](BHMsgHead &head, std::string body_content) { |
| | | if (head.route_size() > 0) { |
| | | auto &back = head.route(head.route_size() - 1); |
| | | MQInfo dest = {back.mq_id(), back.abs_addr()}; |
| | | head.mutable_route()->RemoveLast(); |
| | | center->PassRemoteReplyToLocal(dest, head, std::move(body_content)); |
| | | } |
| | | }; |
| | | uint16_t port = head.dest().port(); |
| | | if (port == 0) { |
| | | port = kBHCenterPort; |
| | | } |
| | | if (!tcp_proxy.Request(head.dest().ip(), port, msg.content(), onResult)) { |
| | | replyer(MakeReply<Reply>(eError, "send request failed.")); |
| | | } else { |
| | | // success |
| | | } |
| | | } |
| | | return true; |
| | | } else { |
| | | // ignore other msgs for now. |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | switch (head.type()) { |
| | | CASE_ON_MSG_TYPE(ProcInit); |
| | | CASE_ON_MSG_TYPE(Register); |
| | |
| | | default: return false; |
| | | } |
| | | }; |
| | | BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); |
| | | BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); |
| | | |
| | | auto OnBusIdle = [=](ShmSocket &socket) {}; |
| | | auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; |
| | |
| | | } |
| | | }; |
| | | |
| | | BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); |
| | | BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); |
| | | |
| | | return true; |
| | | } |
| | |
| | | BHCenter::BHCenter(Socket::Shm &shm) |
| | | { |
| | | auto nsec = NodeTimeoutSec(); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. |
| | | AddCenter(center_ptr); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. |
| | | io_service_.reset(new IoService); |
| | | tcp_proxy_.reset(new TcpProxy(io_service_->io())); |
| | | |
| | | AddCenter(center_ptr, shm, *tcp_proxy_); |
| | | |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | |
| | | } |
| | | |
| | | topic_node_.reset(new CenterTopicNode(center_ptr, shm)); |
| | | tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr)); |
| | | } |
| | | |
| | | BHCenter::~BHCenter() { Stop(); } |
| | | |
| | | bool BHCenter::Start() |
| | |
| | | |
| | | bool BHCenter::Stop() |
| | | { |
| | | tcp_proxy_.reset(); |
| | | tcp_server_.reset(); |
| | | io_service_.reset(); |
| | | topic_node_->Stop(); |
| | | for (auto &kv : sockets_) { |
| | | kv.second->Stop(); |