From aa1542b6d6a4680088ac715c4ce40f97ada554fb Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 14 四月 2021 17:52:31 +0800
Subject: [PATCH] add SendQ TrySend() TryRecv(); handle re-register.

---
 src/topic_node.cpp |  150 +++++++++++++++++++++++++++-----------------------
 1 files changed, 81 insertions(+), 69 deletions(-)

diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 8f039de..4ce2c97 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -17,7 +17,6 @@
  */
 #include "topic_node.h"
 #include "bh_util.h"
-#include "failed_msg.h"
 #include <chrono>
 #include <list>
 
@@ -33,9 +32,8 @@
 	std::string msg_id;
 };
 
-typedef FailedMsgQ ServerFailedQ;
-
 } // namespace
+
 TopicNode::TopicNode(SharedMemory &shm) :
     shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
 {
@@ -76,15 +74,20 @@
 	auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
 	AddRoute(head, sock.id());
 
-	MsgI reply;
-	DEFER1(reply.Release(shm_););
-	BHMsgHead reply_head;
-	bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
-	r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
-	if (r && IsSuccess(reply_body.errmsg().errcode())) {
-		info_ = body;
+	if (timeout_ms == 0) {
+		return sock.Send(&BHTopicCenterAddress(), head, body);
+	} else {
+		MsgI reply;
+		DEFER1(reply.Release(shm_););
+		BHMsgHead reply_head;
+		bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+		r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
+		if (r && IsSuccess(reply_body.errmsg().errcode())) {
+			info_ = body;
+			return true;
+		}
+		return false;
 	}
-	return r;
 }
 
 bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
@@ -96,22 +99,23 @@
 	auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
 	AddRoute(head, sock.id());
 
-	MsgI reply;
-	DEFER1(reply.Release(shm_););
-	BHMsgHead reply_head;
-	bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
-	r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
-	if (r && IsSuccess(reply_body.errmsg().errcode())) {
-		// TODO update proc info
+	if (timeout_ms == 0) {
+		return sock.Send(&BHTopicCenterAddress(), head, body);
+	} else {
+		MsgI reply;
+		DEFER1(reply.Release(shm_););
+		BHMsgHead reply_head;
+		bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+		r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
+		return (r && IsSuccess(reply_body.errmsg().errcode()));
 	}
-	return r;
 }
 bool TopicNode::Heartbeat(const int timeout_ms)
 {
 	ProcInfo proc;
 	proc.set_proc_id(proc_id());
 	MsgCommonReply reply_body;
-	return Heartbeat(proc, reply_body, timeout_ms) && IsSuccess(reply_body.errmsg().errcode());
+	return Heartbeat(proc, reply_body, timeout_ms);
 }
 
 bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
@@ -124,50 +128,43 @@
 	auto head(InitMsgHead(GetType(body), proc_id()));
 	AddRoute(head, sock.id());
 
-	MsgI reply;
-	DEFER1(reply.Release(shm_););
-	BHMsgHead reply_head;
-	bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
-	r = r && reply_head.type() == kMsgTypeCommonReply;
-	r = r && reply.ParseBody(reply_body);
-	return r;
+	if (timeout_ms == 0) {
+		return sock.Send(&BHTopicCenterAddress(), head, body);
+	} else {
+		MsgI reply;
+		DEFER1(reply.Release(shm_););
+		BHMsgHead reply_head;
+		bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+		r = r && reply_head.type() == kMsgTypeCommonReply;
+		r = r && reply.ParseBody(reply_body);
+		return r;
+	}
 }
 
 bool TopicNode::ServerStart(const ServerCB &rcb, int nworker)
 {
-	auto failed_q = std::make_shared<ServerFailedQ>();
+	auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+		if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
+		MsgRequestTopic req;
+		if (!imsg.ParseBody(req)) { return; }
 
-	auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
+		MsgRequestTopicReply reply_body;
+		if (rcb(head.proc_id(), req, reply_body)) {
+			BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id()));
 
-	auto onRecv = [this, rcb, failed_q, onIdle](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
-		if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) {
-			MsgRequestTopic req;
-			if (imsg.ParseBody(req)) {
-				MsgRequestTopicReply reply_body;
-				if (rcb(head.proc_id(), req, reply_body)) {
-					BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id()));
-
-					for (int i = 0; i < head.route_size() - 1; ++i) {
-						reply_head.add_route()->Swap(head.mutable_route(i));
-					}
-					MsgI msg;
-					if (msg.Make(sock.shm(), reply_head, reply_body)) {
-						auto &remote = head.route().rbegin()->mq_id();
-						if (!sock.Send(remote.data(), msg, 10)) {
-							failed_q->Push(remote, msg, 10s);
-						}
-					}
-				}
+			for (int i = 0; i < head.route_size() - 1; ++i) {
+				reply_head.add_route()->Swap(head.mutable_route(i));
 			}
-		} else {
-			// ignored, or dropped
+			MsgI msg;
+			if (msg.Make(sock.shm(), reply_head, reply_body)) {
+				auto &remote = head.route().rbegin()->mq_id();
+				sock.Send(remote.data(), msg);
+			}
 		}
-
-		onIdle(sock);
 	};
 
 	auto &sock = SockServer();
-	return rcb && sock.Start(onRecv, onIdle, nworker);
+	return rcb && sock.Start(onRecv, nworker);
 }
 
 bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
@@ -189,7 +186,7 @@
 	return false;
 }
 
-bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body, const int timeout_ms)
+bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body)
 {
 	auto &sock = SockServer();
 
@@ -202,7 +199,7 @@
 	for (unsigned i = 0; i < p->route.size() - 1; ++i) {
 		head.add_route()->Swap(&p->route[i]);
 	}
-	return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms);
+	return sock.Send(p->route.back().mq_id().data(), head, body);
 }
 
 bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
@@ -222,7 +219,7 @@
 	return SockRequest().Start(onData, nworker);
 }
 
-bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const int timeout_ms, const RequestResultCB &cb)
+bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb)
 {
 	auto Call = [&](const void *remote) {
 		auto &sock = SockRequest();
@@ -239,15 +236,15 @@
 					}
 				}
 			};
-			return sock.Send(remote, head, req, timeout_ms, onRecv);
+			return sock.Send(remote, head, req, onRecv);
 		} else {
-			return sock.Send(remote, head, req, timeout_ms);
+			return sock.Send(remote, head, req);
 		}
 	};
 
 	try {
 		BHAddress addr;
-		if (ClientQueryRPCTopic(req.topic(), addr, timeout_ms)) {
+		if (ClientQueryRPCTopic(req.topic(), addr, 1000)) {
 			return Call(addr.mq_id().data());
 		} else {
 			SetLastError(eNotFound, "remote not found.");
@@ -333,14 +330,18 @@
 		BHMsgHead head(InitMsgHead(GetType(pub), proc_id()));
 		AddRoute(head, sock.id());
 
-		MsgI reply;
-		DEFER1(reply.Release(shm()););
-		BHMsgHead reply_head;
-		MsgCommonReply reply_body;
-		return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
-		       reply_head.type() == kMsgTypeCommonReply &&
-		       reply.ParseBody(reply_body) &&
-		       IsSuccess(reply_body.errmsg().errcode());
+		if (timeout_ms == 0) {
+			return sock.Send(&BHTopicBusAddress(), head, pub);
+		} else {
+			MsgI reply;
+			DEFER1(reply.Release(shm()););
+			BHMsgHead reply_head;
+			MsgCommonReply reply_body;
+			return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
+			       reply_head.type() == kMsgTypeCommonReply &&
+			       reply.ParseBody(reply_body) &&
+			       IsSuccess(reply_body.errmsg().errcode());
+		}
 	} catch (...) {
 	}
 	return false;
@@ -357,8 +358,19 @@
 
 		BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
 		AddRoute(head, sock.id());
-
-		return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms);
+		if (timeout_ms == 0) {
+			return sock.Send(&BHTopicBusAddress(), head, sub);
+		} else {
+			MsgI reply;
+			DEFER1(reply.Release(shm()););
+			BHMsgHead reply_head;
+			MsgCommonReply reply_body;
+			return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
+			       reply_head.type() == kMsgTypeCommonReply &&
+			       reply.ParseBody(reply_body) &&
+			       IsSuccess(reply_body.errmsg().errcode());
+		}
+		// TODO wait for result?
 	} catch (...) {
 		return false;
 	}

--
Gitblit v1.8.0