| | |
| | | namespace |
| | | { |
| | | |
| | | //TODO check proc_id |
| | | |
| | | template <class Body, class OnMsg, class Replyer> |
| | | inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer) |
| | | { |
| | |
| | | if (msg.ParseBody(body)) { |
| | | replyer(onmsg(body)); |
| | | } |
| | | } |
| | | |
| | | Handler Combine(const Handler &h1, const Handler &h2) |
| | | { |
| | | return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) { |
| | | return h1(socket, msg, head) || h2(socket, msg, head); |
| | | }; |
| | | } |
| | | template <class... H> |
| | | Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest) |
| | | { |
| | | return Combine(Combine(h0, h1), h2, rest...); |
| | | } |
| | | |
| | | #define CASE_ON_MSG_TYPE(MsgTag) \ |
| | |
| | | center->PassRemoteReplyToLocal(dest, head, std::move(body_content)); |
| | | } |
| | | }; |
| | | if (!tcp_proxy.Request(head.dest().ip(), head.dest().port(), msg.content(), onResult)) { |
| | | uint16_t port = head.dest().port(); |
| | | if (port == 0) { |
| | | port = kBHCenterPort; |
| | | } |
| | | if (!tcp_proxy.Request(head.dest().ip(), port, msg.content(), onResult)) { |
| | | replyer(MakeReply<Reply>(eError, "send request failed.")); |
| | | } else { |
| | | // success |
| | |
| | | |
| | | auto OnBusIdle = [=](ShmSocket &socket) {}; |
| | | auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; |
| | | auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center); |
| | | auto OnPublish = [&]() { |
| | | MsgPublish pub; |
| | | NodeCenter::Clients clients; |
| | | MsgCommonReply reply; |
| | | if (head.route_size() != 1 || !msg.ParseBody(pub)) { |
| | | return; |
| | | } else if (!center->FindClients(head, pub, clients, reply)) { |
| | | if (head.route_size() == 1 && msg.ParseBody(pub)) { |
| | | // replyer(center->Publish(head, pub.topic(), msg)); // dead lock? |
| | | auto reply(center->Publish(head, pub.topic(), msg)); |
| | | replyer(reply); |
| | | } else { |
| | | replyer(MakeReply(eSuccess)); |
| | | if (clients.empty()) { return; } |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | // should also make sure that mq is not killed before msg expires. |
| | | // it would be ok if (kill_time - offline_time) is longer than expire time. |
| | | socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | } |
| | | auto hosts = center->FindRemoteSubClients(pub.topic()); |
| | | for (auto &host : hosts) { |
| | | tcp_proxy.Publish(host, kBHCenterPort, msg.content()); |
| | | } |
| | | } |
| | | }; |