From c1e39e20ca42b21eeac8b5068fa1f921bf9a070f Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 23 六月 2021 19:43:29 +0800
Subject: [PATCH] refactor, start tcp pub/sub.

---
 src/topic_node.cpp |   11 ++++++-----
 1 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index b21f7ef..6f98694 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -50,8 +50,8 @@
 
 } // namespace
 
-TopicNode::TopicNode(SharedMemory &shm) :
-    shm_(shm), state_(eStateUninited)
+TopicNode::TopicNode(SharedMemory &shm, MQId ssn_id) :
+    shm_(shm), state_(eStateUninited), ssn_id_(ssn_id)
 {
 }
 
@@ -569,7 +569,8 @@
 			reply_head.mutable_proc_id()->swap(out_proc_id);
 			return true;
 		}
-	} catch (...) {
+	} catch (std::exception &e) {
+		LOG_ERROR() << __func__ << " exception: " << e.what();
 		SetLastError(eError, __func__ + std::string(" internal errer."));
 	}
 	return false;
@@ -627,6 +628,7 @@
 		auto &sock = SockPub();
 		BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn()));
 		AddRoute(head, sock);
+		head.set_topic(pub.topic());
 
 		if (timeout_ms == 0) {
 			return sock.Send(BusAddr(), head, pub);
@@ -672,7 +674,6 @@
 			       reply.ParseBody(reply_body) &&
 			       IsSuccess(reply_body.errmsg().errcode());
 		}
-		// TODO wait for result?
 	} catch (...) {
 		return false;
 	}
@@ -718,12 +719,12 @@
 			return false;
 		}
 	}
-	//TODO error msg.
 	if (head.type() == kMsgTypePublish) {
 		if (pub.ParseFromString(body)) {
 			head.mutable_proc_id()->swap(proc_id);
 			return true;
 		}
 	}
+	SetLastError(eError, "invalid subcribe msg received.");
 	return false;
 }
\ No newline at end of file

--
Gitblit v1.8.0