| | |
| | | } // namespace |
| | | |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) |
| | | shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm), registered_(false) |
| | | { |
| | | SockNode().Start(); |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | | SockNode().Start(default_ignore_msg); |
| | | } |
| | | |
| | | TopicNode::~TopicNode() |
| | | { |
| | | Stop(); |
| | | SockNode().Stop(); |
| | | } |
| | | |
| | | void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb) |
| | | void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) |
| | | { |
| | | ServerStart(server_cb, 1); |
| | | SubscribeStartWorker(sub_cb, 1); |
| | | // SockClient().Start(); |
| | | if (nworker < 1) { |
| | | nworker = 1; |
| | | } else if (nworker > 16) { |
| | | nworker = 16; |
| | | } |
| | | |
| | | ServerStart(server_cb, nworker); |
| | | SubscribeStartWorker(sub_cb, nworker); |
| | | ClientStartWorker(client_cb, nworker); |
| | | } |
| | | void TopicNode::Stop() |
| | | { |
| | | SockSub().Stop(); |
| | | SockServer().Stop(); |
| | | SockClient().Stop(); |
| | | SockNode().Stop(); |
| | | } |
| | | |
| | | bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | info_ = proc; |
| | | |
| | | auto &sock = SockNode(); |
| | | MsgRegister body; |
| | | *body.mutable_proc() = proc; |
| | | body.mutable_proc()->Swap(&proc); |
| | | auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); }; |
| | | AddId(SockNode().id()); |
| | | AddId(SockServer().id()); |
| | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { |
| | | bool ok = head.type() == kMsgTypeCommonReply && |
| | | msg.ParseBody(rbody) && |
| | | IsSuccess(rbody.errmsg().errcode()); |
| | | printf("async regisered %s\n", ok ? "ok" : "failed"); |
| | | registered_.store(ok); |
| | | }; |
| | | |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicCenterAddress(), head, body); |
| | | auto onResult = [this, CheckResult](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | MsgCommonReply body; |
| | | CheckResult(imsg, head, body); |
| | | }; |
| | | return sock.Send(&BHTopicCenterAddress(), head, body, onResult); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | | if (r && IsSuccess(reply_body.errmsg().errcode())) { |
| | | info_ = body; |
| | | return true; |
| | | if (r) { |
| | | CheckResult(reply, reply_head, reply_body); |
| | | } |
| | | return false; |
| | | return IsRegistered(); |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | auto &sock = SockNode(); |
| | | MsgHeartbeat body; |
| | | *body.mutable_proc() = proc; |
| | | body.mutable_proc()->Swap(&proc); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | AddRoute(head, sock.id()); |
| | |
| | | |
| | | bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | //TODO check registered |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | auto &sock = SockServer(); |
| | | MsgRegisterRPC body; |
| | | body.mutable_topics()->Swap(&topics); |
| | |
| | | for (int i = 0; i < head.route_size() - 1; ++i) { |
| | | reply_head.add_route()->Swap(head.mutable_route(i)); |
| | | } |
| | | MsgI msg; |
| | | if (msg.Make(sock.shm(), reply_head, reply_body)) { |
| | | auto &remote = head.route().rbegin()->mq_id(); |
| | | sock.Send(remote.data(), msg); |
| | | } |
| | | auto &remote = head.route().rbegin()->mq_id(); |
| | | sock.Send(remote.data(), reply_head, reply_body); |
| | | } |
| | | }; |
| | | |
| | |
| | | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | auto &sock = SockServer(); |
| | | |
| | | MsgI imsg; |
| | |
| | | |
| | | bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | auto &sock = SockServer(); |
| | | |
| | | SrcInfo *p = static_cast<SrcInfo *>(src_info); |
| | |
| | | if (head.type() == kMsgTypeRequestTopicReply) { |
| | | MsgRequestTopicReply reply; |
| | | if (imsg.ParseBody(reply)) { |
| | | cb(head.proc_id(), reply); |
| | | cb(head, reply); |
| | | } |
| | | } |
| | | }; |
| | |
| | | return SockRequest().Start(onData, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb) |
| | | bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb) |
| | | { |
| | | auto Call = [&](const void *remote) { |
| | | auto &sock = SockRequest(); |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id())); |
| | | const std::string &msg_id(NewMsgId()); |
| | | |
| | | out_msg_id = msg_id; |
| | | |
| | | auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) { |
| | | auto &sock = SockClient(); |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id(), msg_id)); |
| | | AddRoute(head, sock.id()); |
| | | head.set_topic(req.topic()); |
| | | |
| | | if (cb) { |
| | | auto onRecv = [cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypeRequestTopicReply) { |
| | | MsgRequestTopicReply reply; |
| | | if (imsg.ParseBody(reply)) { |
| | | cb(head.proc_id(), reply); |
| | | cb(head, reply); |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(remote, head, req, onRecv); |
| | | return sock.Send(addr.mq_id().data(), head, req, onRecv); |
| | | } else { |
| | | return sock.Send(remote, head, req); |
| | | return sock.Send(addr.mq_id().data(), head, req); |
| | | } |
| | | }; |
| | | |
| | | try { |
| | | auto &sock = SockClient(); |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(req.topic(), addr, 1000)) { |
| | | return Call(addr.mq_id().data()); |
| | | } else { |
| | | SetLastError(eNotFound, "remote not found."); |
| | | return false; |
| | | |
| | | if (topic_query_cache_.Find(req.topic(), addr)) { |
| | | return SendTo(addr, req, cb); |
| | | } |
| | | |
| | | MsgQueryTopic query; |
| | | query.set_topic(req.topic()); |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | auto onQueryResult = [this, SendTo, req, cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | MsgQueryTopicReply rep; |
| | | if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) { |
| | | auto &addr = rep.address(); |
| | | if (!addr.mq_id().empty()) { |
| | | topic_query_cache_.Update(req.topic(), addr); |
| | | SendTo(addr, req, cb); |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult); |
| | | |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | |
| | | |
| | | bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | try { |
| | | auto &sock = SockRequest(); |
| | | |
| | |
| | | if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { |
| | | BHMsgHead head(InitMsgHead(GetType(request), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | head.set_topic(request.topic()); |
| | | |
| | | MsgI reply_msg; |
| | | DEFER1(reply_msg.Release(shm_);); |
| | |
| | | |
| | | bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | auto &sock = SockRequest(); |
| | | |
| | | if (topic_query_cache_.Find(topic, addr)) { |
| | |
| | | |
| | | bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | try { |
| | | auto &sock = SockPub(); |
| | | BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); |
| | |
| | | |
| | | // subscribe |
| | | |
| | | bool TopicNode::Subscribe(MsgTopicList &topics, const int timeout_ms) |
| | | bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | try { |
| | | auto &sock = SockSub(); |
| | | MsgSubscribe sub; |
| | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm());); |
| | | BHMsgHead reply_head; |
| | | MsgCommonReply reply_body; |
| | | return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | | reply.ParseBody(reply_body) && |
| | |
| | | |
| | | bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | auto &sock = SockSub(); |
| | | MsgI msg; |
| | | DEFER1(msg.Release(shm());); |