| | |
| | | */ |
| | | #include "center.h" |
| | | #include "center_topic_node.h" |
| | | #include "io_service.h" |
| | | #include "node_center.h" |
| | | #include "tcp_proxy.h" |
| | | #include "tcp_server.h" |
| | | #include <chrono> |
| | | |
| | | using namespace std::chrono; |
| | |
| | | namespace |
| | | { |
| | | |
| | | //TODO check proc_id |
| | | |
| | | template <class Body, class OnMsg, class Replyer> |
| | | inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer) |
| | | { |
| | |
| | | if (msg.ParseBody(body)) { |
| | | replyer(onmsg(body)); |
| | | } |
| | | } |
| | | |
| | | Handler Combine(const Handler &h1, const Handler &h2) |
| | | { |
| | | return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) { |
| | | return h1(socket, msg, head) || h2(socket, msg, head); |
| | | }; |
| | | } |
| | | template <class... H> |
| | | Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest) |
| | | { |
| | | return Combine(Combine(h0, h1), h2, rest...); |
| | | } |
| | | |
| | | #define CASE_ON_MSG_TYPE(MsgTag) \ |
| | |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); |
| | | MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; |
| | | MsgI msg; |
| | | MsgI msg(socket.shm()); |
| | | if (msg.Make(reply_head, rep_body)) { |
| | | DEFER1(msg.Release();); |
| | | center->SendAllocMsg(socket, remote, msg); |
| | |
| | | }; |
| | | } |
| | | |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy) |
| | | { |
| | | // command |
| | | auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { |
| | |
| | | auto onInit = [&](const int64_t request) { |
| | | return center->OnNodeInit(socket, request); |
| | | }; |
| | | BHCenterHandleInit(onInit); |
| | | BHCenterHandleInit(socket.shm(), onInit); |
| | | center->OnTimer(); |
| | | }; |
| | | |
| | | auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center); |
| | | |
| | | if (!head.dest().ip().empty()) { // other host, proxy |
| | | auto valid = [&]() { return head.route_size() == 1; }; |
| | | if (!valid()) { return false; } |
| | | |
| | | if (head.type() == kMsgTypeRequestTopic) { |
| | | typedef MsgRequestTopicReply Reply; |
| | | Reply reply; |
| | | if (!center->CheckMsg(head, reply)) { |
| | | replyer(reply); |
| | | } else { |
| | | auto onResult = [¢er](BHMsgHead &head, std::string body_content) { |
| | | if (head.route_size() > 0) { |
| | | auto &back = head.route(head.route_size() - 1); |
| | | MQInfo dest = {back.mq_id(), back.abs_addr()}; |
| | | head.mutable_route()->RemoveLast(); |
| | | center->PassRemoteReplyToLocal(dest, head, std::move(body_content)); |
| | | } |
| | | }; |
| | | uint16_t port = head.dest().port(); |
| | | if (port == 0) { |
| | | port = kBHCenterPort; |
| | | } |
| | | if (!tcp_proxy.Request(head.dest().ip(), port, msg.content(), onResult)) { |
| | | replyer(MakeReply<Reply>(eError, "send request failed.")); |
| | | } else { |
| | | // success |
| | | } |
| | | } |
| | | return true; |
| | | } else { |
| | | // ignore other msgs for now. |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | switch (head.type()) { |
| | | CASE_ON_MSG_TYPE(ProcInit); |
| | | CASE_ON_MSG_TYPE(Register); |
| | |
| | | default: return false; |
| | | } |
| | | }; |
| | | BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); |
| | | BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); |
| | | |
| | | auto OnBusIdle = [=](ShmSocket &socket) {}; |
| | | auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; |
| | | auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center); |
| | | auto OnPublish = [&]() { |
| | | MsgPublish pub; |
| | | NodeCenter::Clients clients; |
| | | MsgCommonReply reply; |
| | | if (head.route_size() != 1 || !msg.ParseBody(pub)) { |
| | | return; |
| | | } else if (!center->FindClients(head, pub, clients, reply)) { |
| | | if (head.route_size() == 1 && msg.ParseBody(pub)) { |
| | | // replyer(center->Publish(head, pub.topic(), msg)); // dead lock? |
| | | auto reply(center->Publish(head, pub.topic(), msg)); |
| | | replyer(reply); |
| | | } else { |
| | | replyer(MakeReply(eSuccess)); |
| | | if (clients.empty()) { return; } |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | // should also make sure that mq is not killed before msg expires. |
| | | // it would be ok if (kill_time - offline_time) is longer than expire time. |
| | | socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | } |
| | | auto hosts = center->FindRemoteSubClients(pub.topic()); |
| | | for (auto &host : hosts) { |
| | | tcp_proxy.Publish(host, kBHCenterPort, msg.content()); |
| | | } |
| | | } |
| | | }; |
| | |
| | | } |
| | | }; |
| | | |
| | | BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); |
| | | BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); |
| | | |
| | | return true; |
| | | } |
| | |
| | | |
| | | BHCenter::BHCenter(Socket::Shm &shm) |
| | | { |
| | | auto gc = [&](const MQId id) { |
| | | auto r = ShmSocket::Remove(shm, id); |
| | | if (r) { |
| | | LOG_DEBUG() << "remove mq " << id << " ok\n"; |
| | | } |
| | | }; |
| | | |
| | | auto nsec = NodeTimeoutSec(); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. |
| | | AddCenter(center_ptr); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. |
| | | io_service_.reset(new IoService); |
| | | tcp_proxy_.reset(new TcpProxy(io_service_->io())); |
| | | |
| | | AddCenter(center_ptr, shm, *tcp_proxy_); |
| | | |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | |
| | | } |
| | | |
| | | topic_node_.reset(new CenterTopicNode(center_ptr, shm)); |
| | | tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr)); |
| | | } |
| | | |
| | | BHCenter::~BHCenter() { Stop(); } |
| | | |
| | | bool BHCenter::Start() |
| | |
| | | |
| | | bool BHCenter::Stop() |
| | | { |
| | | tcp_proxy_.reset(); |
| | | tcp_server_.reset(); |
| | | io_service_.reset(); |
| | | topic_node_->Stop(); |
| | | for (auto &kv : sockets_) { |
| | | kv.second->Stop(); |