| | |
| | | */ |
| | | #include "center.h" |
| | | #include "center_topic_node.h" |
| | | #include "io_service.h" |
| | | #include "node_center.h" |
| | | #include "tcp_proxy.h" |
| | | #include "tcp_server.h" |
| | | #include <chrono> |
| | | |
| | |
| | | }; |
| | | } |
| | | |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm) |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy) |
| | | { |
| | | // command |
| | | auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { |
| | |
| | | center->OnTimer(); |
| | | }; |
| | | |
| | | auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center); |
| | | |
| | | if (!head.dest().ip().empty()) { // other host, proxy |
| | | auto valid = [&]() { return head.route_size() == 1; }; |
| | | if (!valid()) { return false; } |
| | | |
| | | if (head.type() == kMsgTypeRequestTopic) { |
| | | typedef MsgRequestTopicReply Reply; |
| | | Reply reply; |
| | | if (!center->CheckMsg(head, reply)) { |
| | | replyer(reply); |
| | | } else { |
| | | auto onResult = [¢er](BHMsgHead &head, std::string body_content) { |
| | | if (head.route_size() > 0) { |
| | | auto &back = head.route(head.route_size() - 1); |
| | | MQInfo dest = {back.mq_id(), back.abs_addr()}; |
| | | head.mutable_route()->RemoveLast(); |
| | | center->PassRemoteReplyToLocal(dest, head, std::move(body_content)); |
| | | } |
| | | }; |
| | | if (!tcp_proxy.Request(head.dest().ip(), head.dest().port(), msg.content(), onResult)) { |
| | | replyer(MakeReply<Reply>(eError, "send request failed.")); |
| | | } else { |
| | | // success |
| | | } |
| | | } |
| | | return true; |
| | | } else { |
| | | // ignore other msgs for now. |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | switch (head.type()) { |
| | | CASE_ON_MSG_TYPE(ProcInit); |
| | | CASE_ON_MSG_TYPE(Register); |
| | |
| | | { |
| | | auto nsec = NodeTimeoutSec(); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. |
| | | AddCenter(center_ptr, shm); |
| | | io_service_.reset(new IoService); |
| | | tcp_proxy_.reset(new TcpProxy(io_service_->io())); |
| | | |
| | | AddCenter(center_ptr, shm, *tcp_proxy_); |
| | | |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | |
| | | } |
| | | |
| | | topic_node_.reset(new CenterTopicNode(center_ptr, shm)); |
| | | tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr)); |
| | | tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr)); |
| | | } |
| | | |
| | | BHCenter::~BHCenter() { Stop(); } |
| | |
| | | sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); |
| | | } |
| | | topic_node_->Start(); |
| | | tcp_server_->Start(); |
| | | return true; |
| | | } |
| | | |
| | | bool BHCenter::Stop() |
| | | { |
| | | tcp_server_->Stop(); |
| | | tcp_proxy_.reset(); |
| | | tcp_server_.reset(); |
| | | io_service_.reset(); |
| | | topic_node_->Stop(); |
| | | for (auto &kv : sockets_) { |
| | | kv.second->Stop(); |