From b55ffe89f4b237be5f79232cfddfe22bfdb87c64 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 01 四月 2021 13:23:48 +0800
Subject: [PATCH] make req/rep,sub/pub sockets sub class;

---
 src/socket.cpp |  210 ++++++----------------------------------------------
 1 files changed, 24 insertions(+), 186 deletions(-)

diff --git a/src/socket.cpp b/src/socket.cpp
index 13f1e38..4c2fc6b 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -20,8 +20,6 @@
 #include "bh_util.h"
 #include "defs.h"
 #include "msg.h"
-#include <chrono>
-#include <condition_variable>
 
 using namespace bhome_msg;
 using namespace bhome_shm;
@@ -31,78 +29,33 @@
 
 } // namespace
 
-//TODO maybe change to base class, each type is a sub class.
-
-ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) :
-    shm_(shm), type_(type), run_(false)
+ShmSocket::ShmSocket(Shm &shm, const void *id, const int len) :
+    shm_(shm), run_(false)
 {
-	switch (type) {
-	case eSockBus: mq_.reset(new Queue(kBHBusQueueId, shm_, 1000)); break;
-	case eSockRequest: mq_.reset(new Queue(shm_, 12)); break;
-	case eSockReply: mq_.reset(new Queue(shm_, 64)); break;
-	case eSockSubscribe: mq_.reset(new Queue(shm_, 64)); break;
-	case eSockPublish: break; // no recv mq needed
-	default: break;
+	if (id && len > 0) {
+		mq_.reset(new Queue(*static_cast<const MQId *>(id), shm, len));
 	}
 }
-
-ShmSocket::ShmSocket(Type type) :
-    ShmSocket(type, BHomeShm()) {}
+ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
+    shm_(shm), run_(false)
+{
+	if (len > 0) {
+		mq_.reset(new Queue(shm_, len));
+	}
+}
 
 ShmSocket::~ShmSocket()
 {
 	Stop();
 }
 
-bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
-{
-	if (type_ != eSockPublish) {
-		return false;
-	}
-	assert(!mq_);
-	try {
-		MsgI imsg;
-		if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) {
-			return false;
-		}
-		DEFER1(imsg.Release(shm_));
-		return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms);
-	} catch (...) {
-		return false;
-	}
-}
-
-bool ShmSocket::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
-{
-	if (type_ != eSockSubscribe) {
-		return false;
-	}
-	assert(mq_);
-	try {
-		return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms);
-	} catch (...) {
-		return false;
-	}
-}
-
 bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker)
 {
-	auto CanRecv = [this]() {
-		switch (type_) {
-		case eSockRequest:
-		case eSockReply:
-		case eSockBus:
-		case eSockSubscribe:
-			return true;
-		default:
-			return false;
-		}
-	};
-	if (!CanRecv()) {
+	if (!mq_) {
 		return false;
 	}
-	std::lock_guard<std::mutex> lock(mutex_);
 
+	std::lock_guard<std::mutex> lock(mutex_);
 	StopNoLock();
 	auto RecvProc = [this, onData]() {
 		while (run_) {
@@ -127,31 +80,6 @@
 	return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker);
 }
 
-bool ShmSocket::StartAsync(int nworker)
-{
-	auto AsyncRecvProc = [this](BHMsg &msg) {
-		auto Find = [&](RecvCB &cb) {
-			std::lock_guard<std::mutex> lock(mutex_);
-			const std::string &msgid = msg.msg_id();
-			auto pos = async_cbs_.find(msgid);
-			if (pos != async_cbs_.end()) {
-				cb.swap(pos->second);
-				async_cbs_.erase(pos);
-				return true;
-			} else {
-				return false;
-			}
-		};
-
-		RecvCB cb;
-		if (Find(cb) && cb) {
-			cb(msg);
-		}
-	};
-
-	return Start(AsyncRecvProc, nworker);
-}
-
 bool ShmSocket::Stop()
 {
 	std::lock_guard<std::mutex> lock(mutex_);
@@ -166,118 +94,28 @@
 				w.join();
 			}
 		}
+		workers_.clear();
 		return true;
 	}
 	return false;
 }
 
-bool ShmSocket::AsyncRequest(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
+bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms)
 {
-	if (type_ != eSockRequest) {
+	std::lock_guard<std::mutex> lock(mutex_);
+	if (!mq_ || RunningNoLock()) {
 		return false;
-	}
-	assert(remote && pmsg && !mq_);
-	try {
-		const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
-		auto RegisterCB = [&]() {
-			std::lock_guard<std::mutex> lock(mutex_);
-			async_cbs_.emplace(msg.msg_id(), cb);
-		};
-
-		return mq_->Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB);
-	} catch (...) {
-		return false;
+	} else {
+		return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
 	}
 }
 
-bool ShmSocket::SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms)
+bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms)
 {
-	struct State {
-		std::mutex mutex;
-		std::condition_variable cv;
-		bool canceled = false;
-	};
-
-	try {
-		std::shared_ptr<State> st(new State);
-		auto OnRecv = [=](BHMsg &msg) {
-			std::unique_lock<std::mutex> lk(st->mutex);
-			if (!st->canceled) {
-				static_cast<BHMsg *>(result)->Swap(&msg);
-				st->cv.notify_one();
-			}
-		};
-
-		std::unique_lock<std::mutex> lk(st->mutex);
-		auto end = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
-		if (AsyncRequest(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, end) == std::cv_status::no_timeout) {
-			return true;
-		} else {
-			st->canceled = true;
-			return false;
-		}
-	} catch (...) {
+	std::lock_guard<std::mutex> lock(mutex_);
+	if (!mq_ || RunningNoLock()) {
 		return false;
-	}
-}
-
-bool ShmSocket::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
-{
-	BHMsg result;
-	const BHMsg &msg = MakeQueryTopic(topic);
-	if (SyncRequest(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
-		if (result.type() == kMsgTypeQueryTopicReply) {
-			DataQueryTopicReply reply;
-			if (reply.ParseFromString(result.body())) {
-				addr = reply.address();
-				return !addr.mq_id().empty();
-			}
-		}
-	}
-	return false;
-}
-
-bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
-{
-	auto Call = [&](const void *remote) {
-		const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size));
-		auto onRecv = [cb](BHMsg &msg) {
-			if (msg.type() == kMsgTypeReply) {
-				DataReply reply;
-				if (reply.ParseFromString(msg.body())) {
-					cb(reply.data().data(), reply.data().size());
-				}
-			}
-		};
-		return AsyncRequest(remote, &msg, timeout_ms, onRecv);
-	};
-
-	try {
-		BHAddress addr;
-		if (QueryRPCTopic(topic, addr, timeout_ms)) {
-			return Call(addr.mq_id().data());
-		}
-	} catch (...) {
-		return false;
-	}
-}
-
-bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
-{
-	try {
-		BHAddress addr;
-		if (QueryRPCTopic(topic, addr, timeout_ms)) {
-			const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size));
-			BHMsg reply;
-			if (SyncRequest(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
-				DataReply dr;
-				if (dr.ParseFromString(msg.body())) {
-					dr.mutable_data()->swap(out);
-					return true;
-				}
-			}
-		}
-	} catch (...) {
-		return false;
+	} else {
+		return mq_->Recv(msg, timeout_ms);
 	}
 }

--
Gitblit v1.8.0