| | |
| | | } |
| | | } |
| | | |
| | | void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) |
| | | void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) |
| | | { |
| | | if (nworker < 1) { |
| | | nworker = 1; |
| | |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::ServerStart(const ServerCB &rcb, int nworker) |
| | | bool TopicNode::ServerStart(const ServerSyncCB &rcb, int nworker) |
| | | { |
| | | auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } |
| | |
| | | |
| | | auto &sock = SockServer(); |
| | | return rcb && sock.Start(onRecv, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker) |
| | | { |
| | | auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } |
| | | MsgRequestTopic req; |
| | | if (!imsg.ParseBody(req)) { return; } |
| | | |
| | | SrcInfo *p = new SrcInfo; |
| | | p->route.assign(head.route().begin(), head.route().end()); |
| | | p->msg_id = head.msg_id(); |
| | | acb(p, *head.mutable_proc_id(), req); |
| | | }; |
| | | |
| | | auto &sock = SockServer(); |
| | | return acb && sock.Start(onRecv, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) |
| | |
| | | }; |
| | | |
| | | try { |
| | | auto &sock = SockClient(); |
| | | BHAddress addr; |
| | | |
| | | if (topic_query_cache_.Find(req.topic(), addr)) { |
| | | #if 1 |
| | | return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb); |
| | | #else |
| | | if (topic_query_cache_.Pick(req.topic(), addr)) { |
| | | return SendTo(addr, req, cb); |
| | | } |
| | | |
| | | auto &sock = SockClient(); |
| | | MsgQueryTopic query; |
| | | query.set_topic(req.topic()); |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | |
| | | if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) { |
| | | auto &addr = rep.address(); |
| | | if (!addr.mq_id().empty()) { |
| | | topic_query_cache_.Update(req.topic(), addr); |
| | | topic_query_cache_.Store(req.topic(), addr); |
| | | SendTo(addr, req, cb); |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult); |
| | | return sock.Send(&BHTopicCenterAddress(), head, query, std::move(onQueryResult)); |
| | | #endif |
| | | |
| | | } catch (...) { |
| | | SetLastError(eError, "internal error."); |
| | |
| | | if (addr.mq_id().empty()) { |
| | | return false; |
| | | } else { |
| | | topic_query_cache_.Update(topic, addr); |
| | | topic_query_cache_.Store(topic, addr); |
| | | return true; |
| | | } |
| | | } |