From 2197cf91e7a3bd5941327ba630a42946b88f069e Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 09 四月 2021 14:15:41 +0800 Subject: [PATCH] join pub/sub to node; refactor. --- src/center.cpp | 14 ++++++++------ 1 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/center.cpp b/src/center.cpp index fe549b7..71c85c3 100644 --- a/src/center.cpp +++ b/src/center.cpp @@ -336,7 +336,7 @@ auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); - bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10); + bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100); if (!r) { printf("send reply failed.\n"); } @@ -364,18 +364,20 @@ MsgPublish pub; NodeCenter::Clients clients; MsgCommonReply reply; - MsgI pubmsg; if (head.route_size() != 1 || !msg.ParseBody(pub)) { return; } else if (!center->FindClients(head, pub, clients, reply)) { - // send error reply. MakeReplyer(socket, head, center->id())(reply); - } else if (pubmsg.MakeRC(socket.shm(), msg)) { - DEFER1(pubmsg.Release(socket.shm())); + } else { + MakeReplyer(socket, head, center->id())(MakeReply(eSuccess)); + if (!msg.EnableRefCount(socket.shm())) { return; } // no memory? + for (auto &cli : clients) { auto node = cli.weak_node_.lock(); if (node) { - socket.Send(cli.mq_.data(), pubmsg, 10); + if (!socket.Send(cli.mq_.data(), msg, 100)) { + printf("center route publish failed. need resend.\n"); + } } } } -- Gitblit v1.8.0