From c64c54d8e75b9354dc49a7b6b2d326e7dd59eb37 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 15 四月 2021 19:32:16 +0800 Subject: [PATCH] add api; fix send, socknode mem leak. --- src/topic_node.cpp | 129 +++++++++++++++++++++++++++++++------------ 1 files changed, 93 insertions(+), 36 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 4ce2c97..e9e627f 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -35,35 +35,45 @@ } // namespace TopicNode::TopicNode(SharedMemory &shm) : - shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) + shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm), registered_(false) { - SockNode().Start(); + // recv msgs to avoid memory leak. + auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; + SockNode().Start(default_ignore_msg); } TopicNode::~TopicNode() { Stop(); + SockNode().Stop(); } -void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb) +void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) { - ServerStart(server_cb, 1); - SubscribeStartWorker(sub_cb, 1); - // SockClient().Start(); + if (nworker < 1) { + nworker = 1; + } else if (nworker > 16) { + nworker = 16; + } + + ServerStart(server_cb, nworker); + SubscribeStartWorker(sub_cb, nworker); + ClientStartWorker(client_cb, nworker); } void TopicNode::Stop() { SockSub().Stop(); SockServer().Stop(); SockClient().Stop(); - SockNode().Stop(); } bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { + info_ = proc; + auto &sock = SockNode(); MsgRegister body; - *body.mutable_proc() = proc; + body.mutable_proc()->Swap(&proc); auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); }; AddId(SockNode().id()); AddId(SockServer().id()); @@ -74,27 +84,39 @@ auto head(InitMsgHead(GetType(body), body.proc().proc_id())); AddRoute(head, sock.id()); + auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { + bool ok = head.type() == kMsgTypeCommonReply && + msg.ParseBody(rbody) && + IsSuccess(rbody.errmsg().errcode()); + printf("async regisered %s\n", ok ? "ok" : "failed"); + registered_.store(ok); + }; + if (timeout_ms == 0) { - return sock.Send(&BHTopicCenterAddress(), head, body); + auto onResult = [this, CheckResult](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { + MsgCommonReply body; + CheckResult(imsg, head, body); + }; + return sock.Send(&BHTopicCenterAddress(), head, body, onResult); } else { MsgI reply; DEFER1(reply.Release(shm_);); BHMsgHead reply_head; bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); - r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); - if (r && IsSuccess(reply_body.errmsg().errcode())) { - info_ = body; - return true; + if (r) { + CheckResult(reply, reply_head, reply_body); } - return false; + return IsRegistered(); } } bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { + if (!IsRegistered()) { return false; } + auto &sock = SockNode(); MsgHeartbeat body; - *body.mutable_proc() = proc; + body.mutable_proc()->Swap(&proc); auto head(InitMsgHead(GetType(body), body.proc().proc_id())); AddRoute(head, sock.id()); @@ -120,7 +142,8 @@ bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { - //TODO check registered + if (!IsRegistered()) { return false; } + auto &sock = SockServer(); MsgRegisterRPC body; body.mutable_topics()->Swap(&topics); @@ -155,11 +178,8 @@ for (int i = 0; i < head.route_size() - 1; ++i) { reply_head.add_route()->Swap(head.mutable_route(i)); } - MsgI msg; - if (msg.Make(sock.shm(), reply_head, reply_body)) { - auto &remote = head.route().rbegin()->mq_id(); - sock.Send(remote.data(), msg); - } + auto &remote = head.route().rbegin()->mq_id(); + sock.Send(remote.data(), reply_head, reply_body); } }; @@ -169,6 +189,8 @@ bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) { + if (!IsRegistered()) { return false; } + auto &sock = SockServer(); MsgI imsg; @@ -188,6 +210,8 @@ bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body) { + if (!IsRegistered()) { return false; } + auto &sock = SockServer(); SrcInfo *p = static_cast<SrcInfo *>(src_info); @@ -211,7 +235,7 @@ if (head.type() == kMsgTypeRequestTopicReply) { MsgRequestTopicReply reply; if (imsg.ParseBody(reply)) { - cb(head.proc_id(), reply); + cb(head, reply); } } }; @@ -219,37 +243,60 @@ return SockRequest().Start(onData, nworker); } -bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb) +bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb) { - auto Call = [&](const void *remote) { - auto &sock = SockRequest(); + if (!IsRegistered()) { return false; } - BHMsgHead head(InitMsgHead(GetType(req), proc_id())); + const std::string &msg_id(NewMsgId()); + + out_msg_id = msg_id; + + auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) { + auto &sock = SockClient(); + BHMsgHead head(InitMsgHead(GetType(req), proc_id(), msg_id)); AddRoute(head, sock.id()); + head.set_topic(req.topic()); if (cb) { auto onRecv = [cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { if (head.type() == kMsgTypeRequestTopicReply) { MsgRequestTopicReply reply; if (imsg.ParseBody(reply)) { - cb(head.proc_id(), reply); + cb(head, reply); } } }; - return sock.Send(remote, head, req, onRecv); + return sock.Send(addr.mq_id().data(), head, req, onRecv); } else { - return sock.Send(remote, head, req); + return sock.Send(addr.mq_id().data(), head, req); } }; try { + auto &sock = SockClient(); BHAddress addr; - if (ClientQueryRPCTopic(req.topic(), addr, 1000)) { - return Call(addr.mq_id().data()); - } else { - SetLastError(eNotFound, "remote not found."); - return false; + + if (topic_query_cache_.Find(req.topic(), addr)) { + return SendTo(addr, req, cb); } + + MsgQueryTopic query; + query.set_topic(req.topic()); + BHMsgHead head(InitMsgHead(GetType(query), proc_id())); + AddRoute(head, sock.id()); + + auto onQueryResult = [this, SendTo, req, cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + MsgQueryTopicReply rep; + if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) { + auto &addr = rep.address(); + if (!addr.mq_id().empty()) { + topic_query_cache_.Update(req.topic(), addr); + SendTo(addr, req, cb); + } + } + }; + return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult); + } catch (...) { return false; } @@ -257,6 +304,8 @@ bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms) { + if (!IsRegistered()) { return false; } + try { auto &sock = SockRequest(); @@ -264,6 +313,7 @@ if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { BHMsgHead head(InitMsgHead(GetType(request), proc_id())); AddRoute(head, sock.id()); + head.set_topic(request.topic()); MsgI reply_msg; DEFER1(reply_msg.Release(shm_);); @@ -288,6 +338,8 @@ bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) { + if (!IsRegistered()) { return false; } + auto &sock = SockRequest(); if (topic_query_cache_.Find(topic, addr)) { @@ -325,6 +377,8 @@ bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms) { + if (!IsRegistered()) { return false; } + try { auto &sock = SockPub(); BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); @@ -349,8 +403,10 @@ // subscribe -bool TopicNode::Subscribe(MsgTopicList &topics, const int timeout_ms) +bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { + if (!IsRegistered()) { return false; } + try { auto &sock = SockSub(); MsgSubscribe sub; @@ -364,7 +420,6 @@ MsgI reply; DEFER1(reply.Release(shm());); BHMsgHead reply_head; - MsgCommonReply reply_body; return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body) && @@ -396,6 +451,8 @@ bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) { + if (!IsRegistered()) { return false; } + auto &sock = SockSub(); MsgI msg; DEFER1(msg.Release(shm());); -- Gitblit v1.8.0