From 2197cf91e7a3bd5941327ba630a42946b88f069e Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 09 四月 2021 14:15:41 +0800 Subject: [PATCH] join pub/sub to node; refactor. --- src/topic_node.cpp | 121 +++++++++++++++++++++++++++++++++++---- 1 files changed, 107 insertions(+), 14 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index c6c9771..d76c03a 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -76,29 +76,34 @@ shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) { SockNode().Start(); + SockClient().Start(); + SockServer().Start(); } + TopicNode::~TopicNode() { StopAll(); - SockNode().Stop(); } + void TopicNode::StopAll() { - ServerStop(); - ClientStopWorker(); + SockServer().Stop(); + SockClient().Stop(); + SockNode().Stop(); } bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms) { + auto &sock = SockNode(); + auto head(InitMsgHead(GetType(body), body.proc().proc_id())); - AddRoute(head, SockNode().id()); + AddRoute(head, sock.id()); MsgI reply; DEFER1(reply.Release(shm_);); BHMsgHead reply_head; - bool r = SockNode().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); - r = r && reply_head.type() == kMsgTypeCommonReply; - r = r && reply.ParseBody(reply_body); + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); if (r) { info_ = body; } @@ -108,14 +113,15 @@ bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms) { //TODO check registered + auto &sock = SockServer(); auto head(InitMsgHead(GetType(body), proc_id())); - AddRoute(head, SockReply().id()); + AddRoute(head, sock.id()); MsgI reply; DEFER1(reply.Release(shm_);); BHMsgHead reply_head; - bool r = SockReply().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); r = r && reply_head.type() == kMsgTypeCommonReply; r = r && reply.ParseBody(reply_body); return r; @@ -154,15 +160,17 @@ onIdle(sock); }; - return rcb && SockReply().Start(onRecv, onIdle, nworker); + auto &sock = SockServer(); + return rcb && sock.Start(onRecv, onIdle, nworker); } -bool TopicNode::ServerStop() { return SockReply().Stop(); } bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) { + auto &sock = SockServer(); + MsgI imsg; BHMsgHead head; - if (SockReply().SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { + if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { MsgRequestTopic request; if (imsg.ParseBody(request)) { request.mutable_topic()->swap(topic); @@ -179,6 +187,8 @@ bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms) { + auto &sock = SockServer(); + SrcInfo *p = static_cast<SrcInfo *>(src_info); DEFER1(delete p); if (!p || p->route.empty()) { @@ -192,7 +202,7 @@ head.add_route()->Swap(&p->route[i]); } - return SockReply().Send(p->route.back().mq_id().data(), head, body, timeout_ms); + return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms); } bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) @@ -211,12 +221,12 @@ return SockRequest().Start(onData, nworker); } -bool TopicNode::ClientStopWorker() { return SockRequest().Stop(); } bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) { auto Call = [&](const void *remote) { auto &sock = SockRequest(); + MsgRequestTopic req; req.set_topic(topic); req.set_data(data, size); @@ -254,6 +264,7 @@ { try { auto &sock = SockRequest(); + BHAddress addr; if (ClientQueryRPCTopic(topic, addr, timeout_ms)) { @@ -290,6 +301,7 @@ bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) { auto &sock = SockRequest(); + if (topic_query_cache_.Find(topic, addr)) { return true; } @@ -319,4 +331,85 @@ } else { } return false; +} + +// publish + +bool TopicNode::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms) +{ + try { + auto &sock = SockPub(); + + MsgPublish pub; + pub.set_topic(topic); + pub.set_data(data, size); + BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); + AddRoute(head, sock.id()); + + MsgI reply; + DEFER1(reply.Release(shm());); + BHMsgHead reply_head; + MsgCommonReply reply_body; + return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeCommonReply && + reply.ParseBody(reply_body) && + IsSuccess(reply_body.errmsg().errcode()); + } catch (...) { + } + return false; +} + +// subscribe + +bool TopicNode::Subscribe(const std::vector<Topic> &topics, const int timeout_ms) +{ + try { + auto &sock = SockSub(); + MsgSubscribe sub; + for (auto &topic : topics) { + sub.add_topics(topic); + } + BHMsgHead head(InitMsgHead(GetType(sub), proc_id())); + AddRoute(head, sock.id()); + + return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms); + } catch (...) { + return false; + } +} + +bool TopicNode::SubscribeStartWorker(const TopicDataCB &tdcb, int nworker) +{ + auto &sock = SockSub(); + + auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { + if (head.type() == kMsgTypePublish) { + MsgPublish pub; + if (imsg.ParseBody(pub)) { + tdcb(head.proc_id(), pub.topic(), pub.data()); + } + } else { + // ignored, or dropped + } + }; + + return tdcb && sock.Start(AsyncRecvProc, nworker); +} + +bool TopicNode::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms) +{ + auto &sock = SockSub(); + MsgI msg; + DEFER1(msg.Release(shm());); + BHMsgHead head; + if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { + MsgPublish pub; + if (msg.ParseBody(pub)) { + head.mutable_proc_id()->swap(proc_id); + pub.mutable_topic()->swap(topic); + pub.mutable_data()->swap(data); + return true; + } + } + return false; } \ No newline at end of file -- Gitblit v1.8.0