From 2e99e5311d1b9a53cca17008452cbe49e2af7234 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 31 三月 2021 14:05:09 +0800
Subject: [PATCH] add bus socket for manager; refactor.

---
 src/socket.h    |   23 +-
 src/pubsub.h    |   17 -
 src/socket.cpp  |  104 ++++++++------
 src/pubsub.cpp  |  221 ++++++++++++-------------------
 utest/utest.cpp |    3 
 5 files changed, 162 insertions(+), 206 deletions(-)

diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index d5c7dd2..52285b1 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -18,7 +18,6 @@
 #include "pubsub.h"
 #include "bh_util.h"
 #include "defs.h"
-#include <chrono>
 
 namespace bhome_shm
 {
@@ -28,158 +27,110 @@
 using namespace bhome_msg;
 
 BusManager::BusManager(SharedMemory &shm) :
-    shm_(shm),
-    busq_(kBHBusQueueId, shm, 16),
-    run_(false)
-{
-}
-
-BusManager::~BusManager()
-{
-	Stop();
-}
+    shm_(shm), socket_(ShmSocket::eSockBus, shm) {}
+BusManager::BusManager() :
+    BusManager(BHomeShm()) {}
 
 bool BusManager::Start(const int nworker)
 {
-	std::lock_guard<std::mutex> guard(mutex_);
-	StopNoLock();
-	// start
-	auto Worker = [&]() {
-		while (this->run_) {
-			BusManager &self = *this;
-			MsgI msg;
-			const int timeout_ms = 100;
-			if (self.busq_.Recv(msg, timeout_ms)) {
-				self.OnMsg(msg);
-			}
-		}
-	};
-
-	run_.store(true);
-	const int n = std::min(nworker, kMaxWorker);
-	for (int i = 0; i < n; ++i) {
-		workers_.emplace_back(Worker);
-	}
-	return true;
-}
-
-bool BusManager::Stop()
-{
-	std::lock_guard<std::mutex> guard(mutex_);
-	return StopNoLock();
-}
-
-bool BusManager::StopNoLock()
-{
-	if (run_.exchange(false)) {
-		for (auto &w : workers_) {
-			if (w.joinable()) {
-				w.join();
-			}
-		}
-		return true;
-	}
-	return false;
-}
-
-void BusManager::OnMsg(MsgI &imsg)
-{
-	DEFER1(imsg.Release(shm_));
-
-	BHMsg msg;
-	if (!imsg.Unpack(msg)) {
-		return;
-	}
-
-	auto OnSubChange = [&](auto &&update) {
-		DataSub sub;
-		if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
-			assert(sizeof(MQId) == msg.route(0).mq_id().size());
-			MQId client;
-			memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
-
-			std::lock_guard<std::mutex> guard(mutex_);
-			auto &topics = sub.topics();
-			for (auto &topic : topics) {
-				try {
-					update(topic, client);
-				} catch (...) {
-					//TODO log error
-				}
-			}
-		}
-	};
-
-	auto Sub1 = [this](const std::string &topic, const MQId &id) {
-		records_[topic].insert(id);
-	};
-
-	auto Unsub1 = [this](const std::string &topic, const MQId &id) {
-		auto pos = records_.find(topic);
-		if (pos != records_.end()) {
-			if (pos->second.erase(id) && pos->second.empty()) {
-				records_.erase(pos);
-			}
-		}
-	};
-
-	auto OnPublish = [&]() {
-		DataPub pub;
-		if (!pub.ParseFromString(msg.body())) {
+	auto onRecv = [&](MsgI &imsg) {
+		BHMsg msg;
+		if (!imsg.Unpack(msg)) {
 			return;
 		}
-		auto FindClients = [&](const std::string &topic) {
-			Clients dests;
-			std::lock_guard<std::mutex> guard(mutex_);
-			auto Find1 = [&](const std::string &t) {
-				auto pos = records_.find(topic);
-				if (pos != records_.end() && !pos->second.empty()) {
-					auto &clients = pos->second;
-					for (auto &cli : clients) {
-						dests.insert(cli);
+
+		auto OnSubChange = [&](auto &&update) {
+			DataSub sub;
+			if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
+				assert(sizeof(MQId) == msg.route(0).mq_id().size());
+				MQId client;
+				memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
+
+				std::lock_guard<std::mutex> guard(mutex_);
+				auto &topics = sub.topics();
+				for (auto &topic : topics) {
+					try {
+						update(topic, client);
+					} catch (...) {
+						//TODO log error
 					}
 				}
-			};
-			Find1(topic);
+			}
+		};
 
-			//TODO check and adjust topic on client side sub/pub.
-			size_t pos = 0;
-			while (true) {
-				pos = topic.find(kTopicSep, pos);
-				if (pos == topic.npos || ++pos == topic.size()) {
-					// Find1(std::string()); // sub all.
-					break;
-				} else {
-					Find1(topic.substr(0, pos));
+		auto Sub1 = [this](const std::string &topic, const MQId &id) {
+			records_[topic].insert(id);
+		};
+
+		auto Unsub1 = [this](const std::string &topic, const MQId &id) {
+			auto pos = records_.find(topic);
+			if (pos != records_.end()) {
+				if (pos->second.erase(id) && pos->second.empty()) {
+					records_.erase(pos);
 				}
 			}
-			return dests;
 		};
 
-		auto Dispatch = [&](auto &&send1) {
-			const Clients &clients(FindClients(pub.topic()));
-			for (auto &cli : clients) {
-				send1(cli);
+		auto OnPublish = [&]() {
+			DataPub pub;
+			if (!pub.ParseFromString(msg.body())) {
+				return;
+			}
+			auto FindClients = [&](const std::string &topic) {
+				Clients dests;
+				std::lock_guard<std::mutex> guard(mutex_);
+				auto Find1 = [&](const std::string &t) {
+					auto pos = records_.find(topic);
+					if (pos != records_.end() && !pos->second.empty()) {
+						auto &clients = pos->second;
+						for (auto &cli : clients) {
+							dests.insert(cli);
+						}
+					}
+				};
+				Find1(topic);
+
+				//TODO check and adjust topic on client side sub/pub.
+				size_t pos = 0;
+				while (true) {
+					pos = topic.find(kTopicSep, pos);
+					if (pos == topic.npos || ++pos == topic.size()) {
+						// Find1(std::string()); // sub all.
+						break;
+					} else {
+						Find1(topic.substr(0, pos));
+					}
+				}
+				return dests;
+			};
+
+			auto Dispatch = [&](auto &&send1) {
+				const Clients &clients(FindClients(pub.topic()));
+				for (auto &cli : clients) {
+					send1(cli);
+				}
+			};
+
+			if (imsg.IsCounted()) {
+				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, imsg, 100); });
+			} else {
+				MsgI pubmsg;
+				if (!pubmsg.MakeRC(shm_, msg)) { return; }
+				DEFER1(pubmsg.Release(shm_));
+
+				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, pubmsg, 100); });
 			}
 		};
 
-		if (imsg.IsCounted()) {
-			Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); });
-		} else {
-			MsgI pubmsg;
-			if (!pubmsg.MakeRC(shm_, msg)) { return; }
-			DEFER1(pubmsg.Release(shm_));
-
-			Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); });
+		switch (msg.type()) {
+		case kMsgTypeSubscribe: OnSubChange(Sub1); break;
+		case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
+		case kMsgTypePublish: OnPublish(); break;
+		default: break;
 		}
 	};
 
-	switch (msg.type()) {
-	case kMsgTypeSubscribe: OnSubChange(Sub1); break;
-	case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
-	case kMsgTypePublish: OnPublish(); break;
-	default: break;
-	}
+	return socket_.StartRaw(onRecv, std::min(nworker, kMaxWorker));
 }
 
 } // namespace bhome_shm
diff --git a/src/pubsub.h b/src/pubsub.h
index dc3fced..be6521f 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -18,13 +18,10 @@
 #ifndef PUBSUB_4KGRA997
 #define PUBSUB_4KGRA997
 
-#include "shm_queue.h"
-#include <atomic>
+#include "socket.h"
 #include <mutex>
 #include <set>
-#include <thread>
 #include <unordered_map>
-#include <vector>
 
 namespace bhome_shm
 {
@@ -33,21 +30,17 @@
 class BusManager
 {
 	SharedMemory &shm_;
-	ShmMsgQueue busq_;
-	std::atomic<bool> run_;
-	std::vector<std::thread> workers_;
+	ShmSocket socket_;
 	std::mutex mutex_;
 	typedef std::set<MQId> Clients;
 	std::unordered_map<std::string, Clients> records_;
 
-	bool StopNoLock();
-	void OnMsg(MsgI &msg);
-
 public:
 	BusManager(SharedMemory &shm);
-	~BusManager();
+	BusManager();
+	~BusManager() { Stop(); }
 	bool Start(const int nworker = 2);
-	bool Stop();
+	bool Stop() { return socket_.Stop(); }
 };
 
 } // namespace bhome_shm
diff --git a/src/socket.cpp b/src/socket.cpp
index 21928b8..5eb6756 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -20,58 +20,30 @@
 #include "bh_util.h"
 #include "defs.h"
 #include "msg.h"
-#include <chrono>
 
 using namespace bhome_msg;
 using namespace bhome_shm;
-using namespace std::chrono_literals;
 
 namespace
 {
-
-int GetSocketDefaultLen(ShmSocket::Type type)
-{
-	switch (type) {
-	case ShmSocket::eSockRequest: return 12;
-	case ShmSocket::eSockReply: return 64;
-	case ShmSocket::eSockPublish: return 0;
-	case ShmSocket::eSockSubscribe: return 64;
-	default: return 0;
-	}
-}
 
 } // namespace
 
 ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) :
     shm_(shm), type_(type), run_(false)
 {
-	int len = GetSocketDefaultLen(type);
-	if (len != 0) {
-		mq_.reset(new Queue(shm_, len));
-
-		auto RecvProc = [this]() {
-			while (run_) {
-				try {
-					std::unique_lock<std::mutex> lk(mutex_);
-					if (cv_recv_cb_.wait_for(lk, 100ms, [this]() { return HasRecvCB(); })) {
-						BHMsg msg;
-						if (mq_->Recv(msg, 100)) {
-							this->onRecv_(msg);
-						}
-					}
-				} catch (...) {
-				}
-			}
-		};
-		run_.store(true);
-		workers_.emplace_back(RecvProc);
+	switch (type) {
+	case eSockBus: mq_.reset(new Queue(kBHBusQueueId, shm_, 1000)); break;
+	case eSockRequest: mq_.reset(new Queue(shm_, 12)); break;
+	case eSockReply: mq_.reset(new Queue(shm_, 64)); break;
+	case eSockSubscribe: mq_.reset(new Queue(shm_, 64)); break;
+	case eSockPublish: break; // no recv mq needed
+	default: break;
 	}
 }
 
 ShmSocket::ShmSocket(Type type) :
-    ShmSocket(type, BHomeShm())
-{
-}
+    ShmSocket(type, BHomeShm()) {}
 
 ShmSocket::~ShmSocket()
 {
@@ -110,25 +82,63 @@
 	}
 }
 
-bool ShmSocket::SetRecvCallback(const RecvCB &onRecv)
+bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker)
 {
+	auto CanRecv = [this]() {
+		switch (type_) {
+		case eSockRequest:
+		case eSockReply:
+		case eSockBus:
+		case eSockSubscribe:
+			return true;
+		default:
+			return false;
+		}
+	};
+	if (!CanRecv()) {
+		return false;
+	}
 	std::lock_guard<std::mutex> lock(mutex_);
-	onRecv_ = onRecv;
-	cv_recv_cb_.notify_one();
+
+	StopNoLock();
+	auto RecvProc = [this, onData]() {
+		while (run_) {
+			try {
+				MsgI imsg;
+				DEFER1(imsg.Release(shm_));
+				if (mq_->Recv(imsg, 100)) { onData(imsg); }
+			} catch (...) {
+			}
+		}
+	};
+
+	run_.store(true);
+	for (int i = 0; i < nworker; ++i) {
+		workers_.emplace_back(RecvProc);
+	}
 	return true;
 }
 
-bool ShmSocket::HasRecvCB()
+bool ShmSocket::Start(const RecvCB &onData, int nworker)
 {
-	return static_cast<bool>(onRecv_);
+	return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker);
 }
 
-void ShmSocket::Stop()
+bool ShmSocket::Stop()
 {
-	run_ = false;
-	for (auto &t : workers_) {
-		if (t.joinable()) {
-			t.join();
+	std::lock_guard<std::mutex> lock(mutex_);
+	return StopNoLock();
+}
+
+bool ShmSocket::StopNoLock()
+{
+	if (run_.exchange(false)) {
+		for (auto &w : workers_) {
+			if (w.joinable()) {
+				w.join();
+			}
 		}
+		return true;
 	}
-}
\ No newline at end of file
+	return false;
+}
diff --git a/src/socket.h b/src/socket.h
index 92c1b73..b94eca2 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -21,14 +21,14 @@
 
 #include "shm_queue.h"
 #include <atomic>
-#include <condition_variable>
+#include <boost/noncopyable.hpp>
 #include <functional>
 #include <memory>
 #include <mutex>
 #include <thread>
 #include <vector>
 
-class ShmSocket
+class ShmSocket : private boost::noncopyable
 {
 	typedef bhome_shm::ShmMsgQueue Queue;
 
@@ -38,13 +38,14 @@
 		eSockReply,
 		eSockSubscribe,
 		eSockPublish,
+		eSockBus,
 	};
 	typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
+	typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB;
 
-	ShmSocket(Type type);
 	ShmSocket(Type type, bhome_shm::SharedMemory &shm);
+	ShmSocket(Type type);
 	~ShmSocket();
-
 	// bool Request(const std::string &topic, const void *data, const size_t size, onReply);
 	bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv
 
@@ -55,19 +56,19 @@
 	bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
 	bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
 	bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
-	bool SetRecvCallback(const RecvCB &onRecv);
+
+	// start recv.
+	bool Start(const RecvCB &onData, int nworker = 1);
+	bool StartRaw(const RecvRawCB &onData, int nworker = 1);
+	bool Stop();
 
 private:
-	bool HasRecvCB();
-	void Stop();
-
+	bool StopNoLock();
 	bhome_shm::SharedMemory &shm_;
-	Type type_;
+	const Type type_;
 	std::vector<std::thread> workers_;
 	std::mutex mutex_;
-	std::condition_variable cv_recv_cb_;
 	std::atomic<bool> run_;
-	RecvCB onRecv_;
 
 	std::unique_ptr<Queue> mq_;
 };
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 473b04e..e24d34a 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -5,6 +5,7 @@
 #include <atomic>
 #include <boost/uuid/uuid_generators.hpp>
 #include <boost/uuid/uuid_io.hpp>
+#include <condition_variable>
 #include <stdio.h>
 #include <string>
 #include <thread>
@@ -108,7 +109,7 @@
 			}
 			// printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
 		};
-		client.SetRecvCallback(OnRecv);
+		client.Start(OnRecv);
 
 		std::unique_lock<std::mutex> lk(mutex);
 		cv.wait(lk);

--
Gitblit v1.8.0