From 65ef4d68321e56906920be75831b5e968f7abd7b Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 13 四月 2021 09:34:05 +0800
Subject: [PATCH] add heartbeat; refactor.

---
 src/topic_node.cpp |   43 +++++++++++++++++++++++++++++++++++++------
 1 files changed, 37 insertions(+), 6 deletions(-)

diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 8cd5cc4..788c536 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -39,17 +39,21 @@
 TopicNode::TopicNode(SharedMemory &shm) :
     shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
 {
-	SockNode().Start();
-	SockClient().Start();
-	SockServer().Start();
+	Start();
 }
 
 TopicNode::~TopicNode()
 {
-	StopAll();
+	Stop();
 }
 
-void TopicNode::StopAll()
+void TopicNode::Start()
+{
+	SockNode().Start();
+	SockClient().Start();
+	SockServer().Start();
+}
+void TopicNode::Stop()
 {
 	SockServer().Stop();
 	SockClient().Stop();
@@ -76,12 +80,39 @@
 	BHMsgHead reply_head;
 	bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
 	r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
-	if (r) {
+	if (r && IsSuccess(reply_body.errmsg().errcode())) {
 		info_ = body;
 	}
 	return r;
 }
 
+bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
+{
+	auto &sock = SockNode();
+	MsgHeartbeat body;
+	*body.mutable_proc() = proc;
+
+	auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
+	AddRoute(head, sock.id());
+
+	MsgI reply;
+	DEFER1(reply.Release(shm_););
+	BHMsgHead reply_head;
+	bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+	r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
+	if (r && IsSuccess(reply_body.errmsg().errcode())) {
+		// TODO update proc info
+	}
+	return r;
+}
+bool TopicNode::Heartbeat(const int timeout_ms)
+{
+	ProcInfo proc;
+	proc.set_proc_id(proc_id());
+	MsgCommonReply reply_body;
+	return Heartbeat(proc, reply_body, timeout_ms) && IsSuccess(reply_body.errmsg().errcode());
+}
+
 bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
 {
 	//TODO check registered

--
Gitblit v1.8.0