From 2197cf91e7a3bd5941327ba630a42946b88f069e Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 09 四月 2021 14:15:41 +0800
Subject: [PATCH] join pub/sub to node; refactor.

---
 src/center.cpp |   14 ++++++++------
 1 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/src/center.cpp b/src/center.cpp
index fe549b7..71c85c3 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -336,7 +336,7 @@
 	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
 		return [&](auto &&rep_body) {
 			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
-			bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10);
+			bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100);
 			if (!r) {
 				printf("send reply failed.\n");
 			}
@@ -364,18 +364,20 @@
 			MsgPublish pub;
 			NodeCenter::Clients clients;
 			MsgCommonReply reply;
-			MsgI pubmsg;
 			if (head.route_size() != 1 || !msg.ParseBody(pub)) {
 				return;
 			} else if (!center->FindClients(head, pub, clients, reply)) {
-				// send error reply.
 				MakeReplyer(socket, head, center->id())(reply);
-			} else if (pubmsg.MakeRC(socket.shm(), msg)) {
-				DEFER1(pubmsg.Release(socket.shm()));
+			} else {
+				MakeReplyer(socket, head, center->id())(MakeReply(eSuccess));
+				if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
+
 				for (auto &cli : clients) {
 					auto node = cli.weak_node_.lock();
 					if (node) {
-						socket.Send(cli.mq_.data(), pubmsg, 10);
+						if (!socket.Send(cli.mq_.data(), msg, 100)) {
+							printf("center route publish failed. need resend.\n");
+						}
 					}
 				}
 			}

--
Gitblit v1.8.0