| | |
| | | return IsOnline(); |
| | | } |
| | | } |
| | | bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | info_.Clear(); |
| | | state_cas(eStateOnline, eStateOffline); |
| | | |
| | | auto &sock = SockNode(); |
| | | MsgUnregister body; |
| | | body.mutable_proc()->Swap(&proc); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { |
| | | bool r = head.type() == kMsgTypeCommonReply && |
| | | msg.ParseBody(rbody) && |
| | | IsSuccess(rbody.errmsg().errcode()); |
| | | return r; |
| | | }; |
| | | |
| | | if (timeout_ms == 0) { |
| | | auto onResult = [this, CheckResult](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | MsgCommonReply body; |
| | | CheckResult(imsg, head, body); |
| | | }; |
| | | return sock.Send(&BHTopicCenterAddress(), head, body, onResult); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | return r && CheckResult(reply, reply_head, reply_body); |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | |
| | | proc.set_proc_id(proc_id()); |
| | | MsgCommonReply reply_body; |
| | | return Heartbeat(proc, reply_body, timeout_ms); |
| | | } |
| | | |
| | | bool TopicNode::QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | auto &sock = SockNode(); |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release()); |
| | | BHMsgHead reply_head; |
| | | return (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeQueryTopicReply && |
| | | reply.ParseBody(reply_body)); |
| | | } |
| | | |
| | | bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | |
| | | |
| | | try { |
| | | BHAddress addr; |
| | | #if 1 |
| | | return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb); |
| | | #else |
| | | if (topic_query_cache_.Pick(req.topic(), addr)) { |
| | | return SendTo(addr, req, cb); |
| | | } |
| | | |
| | | auto &sock = SockClient(); |
| | | MsgQueryTopic query; |
| | | query.set_topic(req.topic()); |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | auto onQueryResult = [this, SendTo, req, cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | MsgQueryTopicReply rep; |
| | | if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) { |
| | | auto &addr = rep.address(); |
| | | if (!addr.mq_id().empty()) { |
| | | topic_query_cache_.Store(req.topic(), addr); |
| | | SendTo(addr, req, cb); |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(&BHTopicCenterAddress(), head, query, std::move(onQueryResult)); |
| | | #endif |
| | | |
| | | } catch (...) { |
| | | SetLastError(eError, "internal error."); |
| | | return false; |
| | |
| | | return false; |
| | | } |
| | | |
| | | int TopicNode::QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms) |
| | | { |
| | | int n = 0; |
| | | MsgQueryTopic query; |
| | | query.set_topic(topic); |
| | | MsgQueryTopicReply rep; |
| | | if (QueryTopicAddress(query, rep, timeout_ms)) { |
| | | auto &ls = rep.node_address(); |
| | | n = ls.size(); |
| | | for (auto &na : ls) { |
| | | addr.push_back(na); |
| | | } |
| | | } |
| | | return n; |
| | | } |
| | | |
| | | bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockClient(); |
| | | |
| | | if (topic_query_cache_.Find(topic, addr)) { |
| | | return true; |
| | | } |
| | | |
| | | MsgQueryTopic query; |
| | | query.set_topic(topic); |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release()); |
| | | BHMsgHead reply_head; |
| | | |
| | | if (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms)) { |
| | | if (reply_head.type() == kMsgTypeQueryTopicReply) { |
| | | MsgQueryTopicReply rep; |
| | | if (reply.ParseBody(rep)) { |
| | | addr = rep.address(); |
| | | if (addr.mq_id().empty()) { |
| | | return false; |
| | | } else { |
| | | topic_query_cache_.Store(topic, addr); |
| | | return true; |
| | | } |
| | | } |
| | | std::vector<NodeAddress> lst; |
| | | if (QueryRPCTopics(topic, lst, timeout_ms)) { |
| | | addr = lst.front().addr(); |
| | | if (!addr.mq_id().empty()) { |
| | | topic_query_cache_.Store(topic, addr); |
| | | return true; |
| | | } |
| | | } else { |
| | | } |
| | | return false; |
| | | } |