| | |
| | | } // namespace |
| | | |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm), registered_(false) |
| | | shm_(shm), sock_node_(shm), sock_client_(shm), sock_server_(shm), sock_sub_(shm), registered_(false), registered_ever_(false) |
| | | { |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | |
| | | { |
| | | Stop(); |
| | | SockNode().Stop(); |
| | | if (!registered_ever_) { |
| | | SockNode().Remove(); |
| | | SockClient().Remove(); |
| | | SockServer().Remove(); |
| | | SockSub().Remove(); |
| | | } |
| | | } |
| | | |
| | | void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) |
| | |
| | | bool ok = head.type() == kMsgTypeCommonReply && |
| | | msg.ParseBody(rbody) && |
| | | IsSuccess(rbody.errmsg().errcode()); |
| | | printf("async regisered %s\n", ok ? "ok" : "failed"); |
| | | if (ok) { |
| | | registered_ever_.store(true); |
| | | } |
| | | registered_.store(ok); |
| | | }; |
| | | |
| | |
| | | |
| | | bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsRegistered()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockNode(); |
| | | MsgHeartbeat body; |
| | |
| | | |
| | | bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsRegistered()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockServer(); |
| | | MsgRegisterRPC body; |
| | |
| | | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsRegistered()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockServer(); |
| | | |
| | |
| | | |
| | | bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsRegistered()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockServer(); |
| | | |
| | |
| | | } |
| | | }; |
| | | |
| | | return SockRequest().Start(onData, nworker); |
| | | return SockClient().Start(onData, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb) |
| | |
| | | |
| | | bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsRegistered()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | try { |
| | | auto &sock = SockRequest(); |
| | | auto &sock = SockClient(); |
| | | |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { |
| | |
| | | DEFER1(reply_msg.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | |
| | | if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply) { |
| | | if (reply_msg.ParseBody(out_reply)) { |
| | | reply_head.mutable_proc_id()->swap(out_proc_id); |
| | | return true; |
| | | } else { |
| | | printf("error parse reply.\n"); |
| | | } |
| | | if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeRequestTopicReply && |
| | | reply_msg.ParseBody(out_reply)) { |
| | | reply_head.mutable_proc_id()->swap(out_proc_id); |
| | | return true; |
| | | } |
| | | } else { |
| | | SetLastError(eNotFound, "remote not found."); |
| | | } |
| | | } catch (...) { |
| | | printf("error recv data. line: %d\n", __LINE__); |
| | | SetLastError(eError, __func__ + std::string(" internal errer.")); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsRegistered()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockRequest(); |
| | | auto &sock = SockClient(); |
| | | |
| | | if (topic_query_cache_.Find(topic, addr)) { |
| | | return true; |
| | |
| | | |
| | | bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsRegistered()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | try { |
| | | auto &sock = SockPub(); |
| | |
| | | |
| | | bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsRegistered()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | try { |
| | | auto &sock = SockSub(); |
| | |
| | | |
| | | bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsRegistered()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockSub(); |
| | | MsgI msg; |