From 4deeafbd502dc3c57dab8ad6ca601a38a9e7f074 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 06 四月 2021 19:10:49 +0800
Subject: [PATCH] add uni center.

---
 src/pubsub.cpp |  155 ++++++++++++++-------------------------------------
 1 files changed, 42 insertions(+), 113 deletions(-)

diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index add968c..90688ec 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -16,132 +16,61 @@
  * =====================================================================================
  */
 #include "pubsub.h"
-#include <chrono>
 #include "bh_util.h"
 #include "defs.h"
 
-namespace bhome_shm {
-
 using namespace std::chrono_literals;
-const int kMaxWorker = 16;
 using namespace bhome_msg;
 
-BusManager::BusManager(SharedMemory &shm):
-shm_(shm),
-busq_(kBHBusQueueId, shm, 1000),
-run_(false)
+bool SocketPublish::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
 {
-}
-	
-BusManager::~BusManager()
-{
-    Stop();
+	try {
+		MsgI imsg;
+		if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) {
+			return false;
+		}
+		DEFER1(imsg.Release(shm()));
+		return ShmMsgQueue::Send(shm(), kBHTopicBus, imsg, timeout_ms);
+	} catch (...) {
+		return false;
+	}
 }
 
-bool BusManager::Start(const int nworker)
+bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
 {
-    std::lock_guard<std::mutex> guard(mutex_);
-    StopNoLock();
-    // start
-    auto Worker = [&](){
-        while (this->run_) {
-            BusManager &self = *this;
-            BHMsg msg;
-            const int timeout_ms = 100;
-            if (self.busq_.Recv(msg, timeout_ms)) {
-                self.OnMsg(msg);
-            }
-        }
-    };
-
-    run_.store(true);
-    const int n = std::min(nworker, kMaxWorker);
-    for (int i = 0; i < n; ++i) {
-        workers_.emplace_back(Worker);
-    }
-    return true;
+	try {
+		return mq().Send(kBHTopicBus, MakeSub(mq().Id(), topics), timeout_ms);
+	} catch (...) {
+		return false;
+	}
 }
 
-bool BusManager::Stop()
+bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
 {
-    std::lock_guard<std::mutex> guard(mutex_);
-    return StopNoLock();
+	auto AsyncRecvProc = [this, tdcb](BHMsg &msg) {
+		if (msg.type() == kMsgTypePublish) {
+			MsgPub d;
+			if (d.ParseFromString(msg.body())) {
+				tdcb(d.topic(), d.data());
+			}
+		} else {
+			// ignored, or dropped
+		}
+	};
+
+	return tdcb && Start(AsyncRecvProc, nworker);
 }
 
-bool BusManager::StopNoLock()
+bool SocketSubscribe::RecvSub(Topic &topic, std::string &data, const int timeout_ms)
 {
-    if (run_.exchange(false)) {
-        for (auto &w: workers_) {
-            if (w.joinable()) {
-                w.join();
-            }
-        }
-        return true;
-    }    
-    return false;
-}
-
-void BusManager::OnMsg(const BHMsg &msg)
-{
-    auto OnSubChange = [&](auto &&update) {
-        DataSub sub;
-        if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
-            assert(sizeof(MQId) == msg.route(0).mq_id().size());
-            MQId client;
-            memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
-
-            std::lock_guard<std::mutex> guard(mutex_);
-            auto &topics = sub.topics();
-            for (auto &topic : topics) {
-                try {
-                    update(topic, client);
-                } catch(...) {
-                    //TODO log error
-                }
-            }
-        }
-    };
-
-    auto Sub1 = [this](const std::string &topic, const MQId &id) {
-        records_[topic].insert(id);
-    };
-
-    auto Unsub1 = [this](const std::string &topic, const MQId &id) {
-        auto pos = records_.find(topic);
-        if (pos != records_.end()) {
-            if (pos->second.erase(id) && pos->second.empty()) {
-                records_.erase(pos);
-            }
-        }
-    };
-
-    auto OnPublish = [&]() {
-        DataPub pub;
-        MsgI pubmsg;
-        if (!pub.ParseFromString(msg.body()) || !pubmsg.MakeRC(shm_, msg)) {
-            return;
-        }
-        DEFER1(pubmsg.Release(shm_));
-
-        std::lock_guard<std::mutex> guard(mutex_);
-        auto pos = records_.find(pub.topic());
-        if (pos != records_.end() && !pos->second.empty()) {
-            auto &clients = pos->second;
-            for (auto &cli : clients) {
-                busq_.Send(cli, pubmsg, 100);
-            }
-        } else {
-            // printf("invalid topic: %s\n", pub.topic().c_str());
-        }
-    };
-
-    switch (msg.type()) {
-        case kMsgTypeSubscribe: OnSubChange(Sub1); break;
-        case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
-        case kMsgTypePublish : OnPublish(); break;
-        default: break;
-    }
-}
-
-} // namespace bhome_shm
-
+	BHMsg msg;
+	if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
+		MsgPub d;
+		if (d.ParseFromString(msg.body())) {
+			d.mutable_topic()->swap(topic);
+			d.mutable_data()->swap(data);
+			return true;
+		}
+	}
+	return false;
+}
\ No newline at end of file

--
Gitblit v1.8.0