From c338820e4db43ad32c20ff429a038b06bcb980f8 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 08 四月 2021 18:13:25 +0800
Subject: [PATCH] BIG change, join center,bus; now msg is head+body.

---
 src/pubsub.cpp |   56 ++++++++++++++++++++++++++++++++++++--------------------
 1 files changed, 36 insertions(+), 20 deletions(-)

diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index 0266c86..471c63c 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -22,24 +22,38 @@
 using namespace std::chrono_literals;
 using namespace bhome_msg;
 
-bool SocketPublish::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
+bool SocketPublish::Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms)
 {
 	try {
+		MsgPublish pub;
+		pub.set_topic(topic);
+		pub.set_data(data, size);
+		BHMsgHead head(InitMsgHead(GetType(pub), proc_id));
 		MsgI imsg;
-		if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) {
-			return false;
+		if (imsg.MakeRC(shm(), head, pub)) {
+			DEFER1(imsg.Release(shm()));
+			return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
 		}
-		DEFER1(imsg.Release(shm()));
-		return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
 	} catch (...) {
-		return false;
 	}
+	return false;
 }
+namespace
+{
+inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
 
-bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
+} // namespace
+bool SocketSubscribe::Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms)
 {
 	try {
-		return mq().Send(BHTopicBusAddress(), MakeSub(mq().Id(), topics), timeout_ms);
+		MsgSubscribe sub;
+		for (auto &topic : topics) {
+			sub.add_topics(topic);
+		}
+		BHMsgHead head(InitMsgHead(GetType(sub), proc_id));
+		AddRoute(head, mq().Id());
+
+		return Send(&BHTopicBusAddress(), head, sub, timeout_ms);
 	} catch (...) {
 		return false;
 	}
@@ -47,11 +61,11 @@
 
 bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
 {
-	auto AsyncRecvProc = [this, tdcb](BHMsg &msg) {
-		if (msg.type() == kMsgTypePublish) {
-			MsgPub d;
-			if (d.ParseFromString(msg.body())) {
-				tdcb(d.topic(), d.data());
+	auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
+		if (head.type() == kMsgTypePublish) {
+			MsgPublish pub;
+			if (imsg.ParseBody(pub)) {
+				tdcb(head.proc_id(), pub.topic(), pub.data());
 			}
 		} else {
 			// ignored, or dropped
@@ -61,14 +75,16 @@
 	return tdcb && Start(AsyncRecvProc, nworker);
 }
 
-bool SocketSubscribe::RecvSub(Topic &topic, std::string &data, const int timeout_ms)
+bool SocketSubscribe::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms)
 {
-	BHMsg msg;
-	if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
-		MsgPub d;
-		if (d.ParseFromString(msg.body())) {
-			d.mutable_topic()->swap(topic);
-			d.mutable_data()->swap(data);
+	MsgI msg;
+	BHMsgHead head;
+	if (SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
+		MsgPublish pub;
+		if (msg.ParseBody(pub)) {
+			head.mutable_proc_id()->swap(proc_id);
+			pub.mutable_topic()->swap(topic);
+			pub.mutable_data()->swap(data);
 			return true;
 		}
 	}

--
Gitblit v1.8.0