From aa1542b6d6a4680088ac715c4ce40f97ada554fb Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 14 四月 2021 17:52:31 +0800 Subject: [PATCH] add SendQ TrySend() TryRecv(); handle re-register. --- src/topic_node.cpp | 150 +++++++++++++++++++++++++++----------------------- 1 files changed, 81 insertions(+), 69 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 8f039de..4ce2c97 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -17,7 +17,6 @@ */ #include "topic_node.h" #include "bh_util.h" -#include "failed_msg.h" #include <chrono> #include <list> @@ -33,9 +32,8 @@ std::string msg_id; }; -typedef FailedMsgQ ServerFailedQ; - } // namespace + TopicNode::TopicNode(SharedMemory &shm) : shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) { @@ -76,15 +74,20 @@ auto head(InitMsgHead(GetType(body), body.proc().proc_id())); AddRoute(head, sock.id()); - MsgI reply; - DEFER1(reply.Release(shm_);); - BHMsgHead reply_head; - bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); - r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); - if (r && IsSuccess(reply_body.errmsg().errcode())) { - info_ = body; + if (timeout_ms == 0) { + return sock.Send(&BHTopicCenterAddress(), head, body); + } else { + MsgI reply; + DEFER1(reply.Release(shm_);); + BHMsgHead reply_head; + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); + if (r && IsSuccess(reply_body.errmsg().errcode())) { + info_ = body; + return true; + } + return false; } - return r; } bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) @@ -96,22 +99,23 @@ auto head(InitMsgHead(GetType(body), body.proc().proc_id())); AddRoute(head, sock.id()); - MsgI reply; - DEFER1(reply.Release(shm_);); - BHMsgHead reply_head; - bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); - r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); - if (r && IsSuccess(reply_body.errmsg().errcode())) { - // TODO update proc info + if (timeout_ms == 0) { + return sock.Send(&BHTopicCenterAddress(), head, body); + } else { + MsgI reply; + DEFER1(reply.Release(shm_);); + BHMsgHead reply_head; + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); + return (r && IsSuccess(reply_body.errmsg().errcode())); } - return r; } bool TopicNode::Heartbeat(const int timeout_ms) { ProcInfo proc; proc.set_proc_id(proc_id()); MsgCommonReply reply_body; - return Heartbeat(proc, reply_body, timeout_ms) && IsSuccess(reply_body.errmsg().errcode()); + return Heartbeat(proc, reply_body, timeout_ms); } bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) @@ -124,50 +128,43 @@ auto head(InitMsgHead(GetType(body), proc_id())); AddRoute(head, sock.id()); - MsgI reply; - DEFER1(reply.Release(shm_);); - BHMsgHead reply_head; - bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); - r = r && reply_head.type() == kMsgTypeCommonReply; - r = r && reply.ParseBody(reply_body); - return r; + if (timeout_ms == 0) { + return sock.Send(&BHTopicCenterAddress(), head, body); + } else { + MsgI reply; + DEFER1(reply.Release(shm_);); + BHMsgHead reply_head; + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + r = r && reply_head.type() == kMsgTypeCommonReply; + r = r && reply.ParseBody(reply_body); + return r; + } } bool TopicNode::ServerStart(const ServerCB &rcb, int nworker) { - auto failed_q = std::make_shared<ServerFailedQ>(); + auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } + MsgRequestTopic req; + if (!imsg.ParseBody(req)) { return; } - auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); }; + MsgRequestTopicReply reply_body; + if (rcb(head.proc_id(), req, reply_body)) { + BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); - auto onRecv = [this, rcb, failed_q, onIdle](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { - if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) { - MsgRequestTopic req; - if (imsg.ParseBody(req)) { - MsgRequestTopicReply reply_body; - if (rcb(head.proc_id(), req, reply_body)) { - BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); - - for (int i = 0; i < head.route_size() - 1; ++i) { - reply_head.add_route()->Swap(head.mutable_route(i)); - } - MsgI msg; - if (msg.Make(sock.shm(), reply_head, reply_body)) { - auto &remote = head.route().rbegin()->mq_id(); - if (!sock.Send(remote.data(), msg, 10)) { - failed_q->Push(remote, msg, 10s); - } - } - } + for (int i = 0; i < head.route_size() - 1; ++i) { + reply_head.add_route()->Swap(head.mutable_route(i)); } - } else { - // ignored, or dropped + MsgI msg; + if (msg.Make(sock.shm(), reply_head, reply_body)) { + auto &remote = head.route().rbegin()->mq_id(); + sock.Send(remote.data(), msg); + } } - - onIdle(sock); }; auto &sock = SockServer(); - return rcb && sock.Start(onRecv, onIdle, nworker); + return rcb && sock.Start(onRecv, nworker); } bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) @@ -189,7 +186,7 @@ return false; } -bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body, const int timeout_ms) +bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body) { auto &sock = SockServer(); @@ -202,7 +199,7 @@ for (unsigned i = 0; i < p->route.size() - 1; ++i) { head.add_route()->Swap(&p->route[i]); } - return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms); + return sock.Send(p->route.back().mq_id().data(), head, body); } bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) @@ -222,7 +219,7 @@ return SockRequest().Start(onData, nworker); } -bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const int timeout_ms, const RequestResultCB &cb) +bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb) { auto Call = [&](const void *remote) { auto &sock = SockRequest(); @@ -239,15 +236,15 @@ } } }; - return sock.Send(remote, head, req, timeout_ms, onRecv); + return sock.Send(remote, head, req, onRecv); } else { - return sock.Send(remote, head, req, timeout_ms); + return sock.Send(remote, head, req); } }; try { BHAddress addr; - if (ClientQueryRPCTopic(req.topic(), addr, timeout_ms)) { + if (ClientQueryRPCTopic(req.topic(), addr, 1000)) { return Call(addr.mq_id().data()); } else { SetLastError(eNotFound, "remote not found."); @@ -333,14 +330,18 @@ BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); AddRoute(head, sock.id()); - MsgI reply; - DEFER1(reply.Release(shm());); - BHMsgHead reply_head; - MsgCommonReply reply_body; - return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && - reply_head.type() == kMsgTypeCommonReply && - reply.ParseBody(reply_body) && - IsSuccess(reply_body.errmsg().errcode()); + if (timeout_ms == 0) { + return sock.Send(&BHTopicBusAddress(), head, pub); + } else { + MsgI reply; + DEFER1(reply.Release(shm());); + BHMsgHead reply_head; + MsgCommonReply reply_body; + return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeCommonReply && + reply.ParseBody(reply_body) && + IsSuccess(reply_body.errmsg().errcode()); + } } catch (...) { } return false; @@ -357,8 +358,19 @@ BHMsgHead head(InitMsgHead(GetType(sub), proc_id())); AddRoute(head, sock.id()); - - return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms); + if (timeout_ms == 0) { + return sock.Send(&BHTopicBusAddress(), head, sub); + } else { + MsgI reply; + DEFER1(reply.Release(shm());); + BHMsgHead reply_head; + MsgCommonReply reply_body; + return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeCommonReply && + reply.ParseBody(reply_body) && + IsSuccess(reply_body.errmsg().errcode()); + } + // TODO wait for result? } catch (...) { return false; } -- Gitblit v1.8.0