From c338820e4db43ad32c20ff429a038b06bcb980f8 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 08 四月 2021 18:13:25 +0800
Subject: [PATCH] BIG change, join center,bus; now msg is head+body.

---
 src/pubsub.cpp |  112 ++++++++++++++++++++++++++++++-------------------------
 1 files changed, 61 insertions(+), 51 deletions(-)

diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index e38c445..471c63c 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -16,67 +16,77 @@
  * =====================================================================================
  */
 #include "pubsub.h"
-#include <chrono>
-
-namespace bhome_shm {
+#include "bh_util.h"
+#include "defs.h"
 
 using namespace std::chrono_literals;
-const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
-const int kMaxWorker = 16;
+using namespace bhome_msg;
 
-BusManager::BusManager(SharedMemory &shm):
-busq_(kBusQueueId, shm, 1000),
-run_(false)
+bool SocketPublish::Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms)
 {
+	try {
+		MsgPublish pub;
+		pub.set_topic(topic);
+		pub.set_data(data, size);
+		BHMsgHead head(InitMsgHead(GetType(pub), proc_id));
+		MsgI imsg;
+		if (imsg.MakeRC(shm(), head, pub)) {
+			DEFER1(imsg.Release(shm()));
+			return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
+		}
+	} catch (...) {
+	}
+	return false;
 }
-	
-BusManager::~BusManager()
+namespace
 {
-    Stop();
+inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
+
+} // namespace
+bool SocketSubscribe::Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms)
+{
+	try {
+		MsgSubscribe sub;
+		for (auto &topic : topics) {
+			sub.add_topics(topic);
+		}
+		BHMsgHead head(InitMsgHead(GetType(sub), proc_id));
+		AddRoute(head, mq().Id());
+
+		return Send(&BHTopicBusAddress(), head, sub, timeout_ms);
+	} catch (...) {
+		return false;
+	}
 }
 
-bool BusManager::Start(const int nworker)
+bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
 {
-    std::lock_guard<std::mutex> guard(mutex_);
-    StopNoLock();
-    // start
-    auto Worker = [&](){
-        while (this->run_) {
-            std::this_thread::sleep_for(100ms);
-            BusManager &self = *this;
-            Msg msg;
-            const int timeout_ms = 100;
-            if (!self.busq_.Recv(msg, timeout_ms)) {
-                continue;
-            }
-            // handle msg;
-            // type: subscribe(topic), publish(topic, data)
-        }
-    };
+	auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
+		if (head.type() == kMsgTypePublish) {
+			MsgPublish pub;
+			if (imsg.ParseBody(pub)) {
+				tdcb(head.proc_id(), pub.topic(), pub.data());
+			}
+		} else {
+			// ignored, or dropped
+		}
+	};
 
-    run_.store(true);
-    const int n = std::min(nworker, kMaxWorker);
-    for (int i = 0; i < n; ++i) {
-        workers_.emplace_back(Worker);
-    }
+	return tdcb && Start(AsyncRecvProc, nworker);
 }
 
-bool BusManager::Stop()
+bool SocketSubscribe::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms)
 {
-    std::lock_guard<std::mutex> guard(mutex_);
-    StopNoLock();
-}
-
-bool BusManager::StopNoLock()
-{
-    if (run_.exchange(false)) {
-        for (auto &w: workers_) {
-            if (w.joinable()) {
-                w.join();
-            }
-        }
-    }    
-}
-
-} // namespace bhome_shm
-
+	MsgI msg;
+	BHMsgHead head;
+	if (SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
+		MsgPublish pub;
+		if (msg.ParseBody(pub)) {
+			head.mutable_proc_id()->swap(proc_id);
+			pub.mutable_topic()->swap(topic);
+			pub.mutable_data()->swap(data);
+			return true;
+		}
+	}
+	return false;
+}
\ No newline at end of file

--
Gitblit v1.8.0