From 1b52f1cb8c47dd2c0195d2fd65d7b6a4c2f10704 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 12 四月 2021 18:29:41 +0800 Subject: [PATCH] add fail-resend support. --- src/topic_node.cpp | 188 ++++++++++++++++++++++++++++++++-------------- 1 files changed, 129 insertions(+), 59 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index c6c9771..8cd5cc4 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -17,6 +17,7 @@ */ #include "topic_node.h" #include "bh_util.h" +#include "failed_msg.h" #include <chrono> #include <list> @@ -32,90 +33,69 @@ std::string msg_id; }; -class ServerFailedQ -{ - struct FailedMsg { - steady_clock::time_point xpr; - std::string remote_; - BHMsgHead head_; - MsgRequestTopicReply body_; - FailedMsg(const std::string &addr, BHMsgHead &&head, MsgRequestTopicReply &&body) : - xpr(steady_clock::now() + 10s), remote_(addr), head_(std::move(head)), body_(std::move(body)) {} - bool Expired() { return steady_clock::now() > xpr; } - }; - typedef std::list<FailedMsg> Queue; - Synced<Queue> queue_; - -public: - void Push(const std::string &remote, BHMsgHead &&head, MsgRequestTopicReply &&body) - { - queue_->emplace_back(remote, std::move(head), std::move(body)); - } - void TrySend(ShmSocket &socket, const int timeout_ms = 0) - { - queue_.Apply([&](Queue &q) { - if (!q.empty()) { - auto it = q.begin(); - do { - if (it->Expired()) { - // it->msg_.Release(socket.shm()); - it = q.erase(it); - } else if (socket.Send(it->remote_.data(), it->head_, it->body_, timeout_ms)) { - it = q.erase(it); - } else { - ++it; - } - } while (it != q.end()); - } - }); - } -}; +typedef FailedMsgQ ServerFailedQ; } // namespace TopicNode::TopicNode(SharedMemory &shm) : shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) { SockNode().Start(); + SockClient().Start(); + SockServer().Start(); } + TopicNode::~TopicNode() { StopAll(); - SockNode().Stop(); -} -void TopicNode::StopAll() -{ - ServerStop(); - ClientStopWorker(); } -bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms) +void TopicNode::StopAll() { + SockServer().Stop(); + SockClient().Stop(); + SockNode().Stop(); +} + +bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) +{ + auto &sock = SockNode(); + MsgRegister body; + *body.mutable_proc() = proc; + auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); }; + AddId(SockNode().id()); + AddId(SockServer().id()); + AddId(SockClient().id()); + AddId(SockSub().id()); + AddId(SockPub().id()); + auto head(InitMsgHead(GetType(body), body.proc().proc_id())); - AddRoute(head, SockNode().id()); + AddRoute(head, sock.id()); MsgI reply; DEFER1(reply.Release(shm_);); BHMsgHead reply_head; - bool r = SockNode().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); - r = r && reply_head.type() == kMsgTypeCommonReply; - r = r && reply.ParseBody(reply_body); + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); if (r) { info_ = body; } return r; } -bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms) +bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { //TODO check registered + auto &sock = SockServer(); + MsgRegisterRPC body; + body.mutable_topics()->Swap(&topics); auto head(InitMsgHead(GetType(body), proc_id())); - AddRoute(head, SockReply().id()); + AddRoute(head, sock.id()); MsgI reply; DEFER1(reply.Release(shm_);); BHMsgHead reply_head; - bool r = SockReply().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); r = r && reply_head.type() == kMsgTypeCommonReply; r = r && reply.ParseBody(reply_body); return r; @@ -142,8 +122,12 @@ for (int i = 0; i < head.route_size() - 1; ++i) { reply_head.add_route()->Swap(head.mutable_route(i)); } - if (!sock.Send(head.route().rbegin()->mq_id().data(), reply_head, reply_body, 10)) { - failed_q->Push(head.route().rbegin()->mq_id(), std::move(reply_head), std::move(reply_body)); + MsgI msg; + if (msg.Make(sock.shm(), reply_head, reply_body)) { + auto &remote = head.route().rbegin()->mq_id(); + if (!sock.Send(remote.data(), msg, 10)) { + failed_q->Push(remote, msg, 10s); + } } } } @@ -154,15 +138,17 @@ onIdle(sock); }; - return rcb && SockReply().Start(onRecv, onIdle, nworker); + auto &sock = SockServer(); + return rcb && sock.Start(onRecv, onIdle, nworker); } -bool TopicNode::ServerStop() { return SockReply().Stop(); } bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) { + auto &sock = SockServer(); + MsgI imsg; BHMsgHead head; - if (SockReply().SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { + if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { MsgRequestTopic request; if (imsg.ParseBody(request)) { request.mutable_topic()->swap(topic); @@ -179,6 +165,8 @@ bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms) { + auto &sock = SockServer(); + SrcInfo *p = static_cast<SrcInfo *>(src_info); DEFER1(delete p); if (!p || p->route.empty()) { @@ -192,7 +180,7 @@ head.add_route()->Swap(&p->route[i]); } - return SockReply().Send(p->route.back().mq_id().data(), head, body, timeout_ms); + return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms); } bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) @@ -211,12 +199,12 @@ return SockRequest().Start(onData, nworker); } -bool TopicNode::ClientStopWorker() { return SockRequest().Stop(); } bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) { auto Call = [&](const void *remote) { auto &sock = SockRequest(); + MsgRequestTopic req; req.set_topic(topic); req.set_data(data, size); @@ -254,6 +242,7 @@ { try { auto &sock = SockRequest(); + BHAddress addr; if (ClientQueryRPCTopic(topic, addr, timeout_ms)) { @@ -290,6 +279,7 @@ bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) { auto &sock = SockRequest(); + if (topic_query_cache_.Find(topic, addr)) { return true; } @@ -319,4 +309,84 @@ } else { } return false; +} + +// publish + +bool TopicNode::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms) +{ + try { + auto &sock = SockPub(); + + MsgPublish pub; + pub.set_topic(topic); + pub.set_data(data, size); + BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); + AddRoute(head, sock.id()); + + MsgI reply; + DEFER1(reply.Release(shm());); + BHMsgHead reply_head; + MsgCommonReply reply_body; + return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeCommonReply && + reply.ParseBody(reply_body) && + IsSuccess(reply_body.errmsg().errcode()); + } catch (...) { + } + return false; +} + +// subscribe + +bool TopicNode::Subscribe(MsgTopicList &topics, const int timeout_ms) +{ + try { + auto &sock = SockSub(); + MsgSubscribe sub; + sub.mutable_topics()->Swap(&topics); + + BHMsgHead head(InitMsgHead(GetType(sub), proc_id())); + AddRoute(head, sock.id()); + + return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms); + } catch (...) { + return false; + } +} + +bool TopicNode::SubscribeStartWorker(const TopicDataCB &tdcb, int nworker) +{ + auto &sock = SockSub(); + + auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { + if (head.type() == kMsgTypePublish) { + MsgPublish pub; + if (imsg.ParseBody(pub)) { + tdcb(head.proc_id(), pub.topic(), pub.data()); + } + } else { + // ignored, or dropped + } + }; + + return tdcb && sock.Start(AsyncRecvProc, nworker); +} + +bool TopicNode::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms) +{ + auto &sock = SockSub(); + MsgI msg; + DEFER1(msg.Release(shm());); + BHMsgHead head; + if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { + MsgPublish pub; + if (msg.ParseBody(pub)) { + head.mutable_proc_id()->swap(proc_id); + pub.mutable_topic()->swap(topic); + pub.mutable_data()->swap(data); + return true; + } + } + return false; } \ No newline at end of file -- Gitblit v1.8.0