| | |
| | | std::string msg_id; |
| | | }; |
| | | |
| | | const int kMqLen = 700; |
| | | |
| | | } // namespace |
| | | |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm), registered_(false) |
| | | shm_(shm), sock_node_(shm), sock_client_(shm, kMqLen), sock_server_(shm, kMqLen), sock_sub_(shm, kMqLen), state_(eStateUnregistered) |
| | | { |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | | SockNode().Start(default_ignore_msg); |
| | | SockClient().Start(default_ignore_msg); |
| | | SockServer().Start(default_ignore_msg); |
| | | SockSub().Start(default_ignore_msg); |
| | | } |
| | | |
| | | TopicNode::~TopicNode() |
| | | { |
| | | Stop(); |
| | | SockNode().Stop(); |
| | | if (state() == eStateUnregistered) { |
| | | SockNode().Remove(); |
| | | SockClient().Remove(); |
| | | SockServer().Remove(); |
| | | SockSub().Remove(); |
| | | } |
| | | } |
| | | |
| | | void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) |
| | | void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) |
| | | { |
| | | if (nworker < 1) { |
| | | nworker = 1; |
| | |
| | | bool ok = head.type() == kMsgTypeCommonReply && |
| | | msg.ParseBody(rbody) && |
| | | IsSuccess(rbody.errmsg().errcode()); |
| | | printf("async regisered %s\n", ok ? "ok" : "failed"); |
| | | registered_.store(ok); |
| | | if (ok) { |
| | | state(eStateOnline); |
| | | } else { |
| | | state_cas(eStateOnline, eStateOffline); |
| | | } |
| | | }; |
| | | |
| | | if (timeout_ms == 0) { |
| | |
| | | if (r) { |
| | | CheckResult(reply, reply_head, reply_body); |
| | | } |
| | | return IsRegistered(); |
| | | return IsOnline(); |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockNode(); |
| | | MsgHeartbeat body; |
| | |
| | | |
| | | bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockServer(); |
| | | MsgRegisterRPC body; |
| | |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::ServerStart(const ServerCB &rcb, int nworker) |
| | | bool TopicNode::ServerStart(const ServerSyncCB &rcb, int nworker) |
| | | { |
| | | auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } |
| | |
| | | return rcb && sock.Start(onRecv, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker) |
| | | { |
| | | auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } |
| | | MsgRequestTopic req; |
| | | if (!imsg.ParseBody(req)) { return; } |
| | | |
| | | SrcInfo *p = new SrcInfo; |
| | | p->route.assign(head.route().begin(), head.route().end()); |
| | | p->msg_id = head.msg_id(); |
| | | acb(p, *head.mutable_proc_id(), req); |
| | | }; |
| | | |
| | | auto &sock = SockServer(); |
| | | return acb && sock.Start(onRecv, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockServer(); |
| | | |
| | |
| | | |
| | | bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockServer(); |
| | | |
| | |
| | | } |
| | | }; |
| | | |
| | | return SockRequest().Start(onData, nworker); |
| | | return SockClient().Start(onData, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | const std::string &msg_id(NewMsgId()); |
| | | |
| | |
| | | }; |
| | | |
| | | try { |
| | | auto &sock = SockClient(); |
| | | BHAddress addr; |
| | | |
| | | if (topic_query_cache_.Find(req.topic(), addr)) { |
| | | #if 1 |
| | | return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb); |
| | | #else |
| | | if (topic_query_cache_.Pick(req.topic(), addr)) { |
| | | return SendTo(addr, req, cb); |
| | | } |
| | | |
| | | auto &sock = SockClient(); |
| | | MsgQueryTopic query; |
| | | query.set_topic(req.topic()); |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | |
| | | if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) { |
| | | auto &addr = rep.address(); |
| | | if (!addr.mq_id().empty()) { |
| | | topic_query_cache_.Update(req.topic(), addr); |
| | | topic_query_cache_.Store(req.topic(), addr); |
| | | SendTo(addr, req, cb); |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult); |
| | | return sock.Send(&BHTopicCenterAddress(), head, query, std::move(onQueryResult)); |
| | | #endif |
| | | |
| | | } catch (...) { |
| | | SetLastError(eError, "internal error."); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | try { |
| | | auto &sock = SockRequest(); |
| | | auto &sock = SockClient(); |
| | | |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { |
| | |
| | | DEFER1(reply_msg.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | |
| | | if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply) { |
| | | if (reply_msg.ParseBody(out_reply)) { |
| | | reply_head.mutable_proc_id()->swap(out_proc_id); |
| | | return true; |
| | | } else { |
| | | printf("error parse reply.\n"); |
| | | } |
| | | if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeRequestTopicReply && |
| | | reply_msg.ParseBody(out_reply)) { |
| | | reply_head.mutable_proc_id()->swap(out_proc_id); |
| | | return true; |
| | | } |
| | | } else { |
| | | SetLastError(eNotFound, "remote not found."); |
| | | } |
| | | } catch (...) { |
| | | printf("error recv data. line: %d\n", __LINE__); |
| | | SetLastError(eError, __func__ + std::string(" internal errer.")); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) |
| | | bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockRequest(); |
| | | auto &sock = SockClient(); |
| | | |
| | | if (topic_query_cache_.Find(topic, addr)) { |
| | | return true; |
| | |
| | | if (addr.mq_id().empty()) { |
| | | return false; |
| | | } else { |
| | | topic_query_cache_.Update(topic, addr); |
| | | topic_query_cache_.Store(topic, addr); |
| | | return true; |
| | | } |
| | | } |
| | |
| | | |
| | | bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | try { |
| | | auto &sock = SockPub(); |
| | |
| | | |
| | | bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | try { |
| | | auto &sock = SockSub(); |
| | |
| | | |
| | | bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockSub(); |
| | | MsgI msg; |