From b55ffe89f4b237be5f79232cfddfe22bfdb87c64 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 01 四月 2021 13:23:48 +0800
Subject: [PATCH] make req/rep,sub/pub sockets sub class;

---
 src/socket.h          |   45 +-
 src/pubsub_center.h   |   53 +++
 src/reqrep.h          |   57 +++
 src/pubsub.h          |   50 +-
 src/socket.cpp        |  210 +-----------
 src/pubsub.cpp        |  153 ++------
 src/pubsub_center.cpp |  134 ++++++++
 utest/utest.cpp       |   36 -
 src/reqrep.cpp        |  164 ++++++++++
 9 files changed, 537 insertions(+), 365 deletions(-)

diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index eff54ab..cfc77ab 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -19,127 +19,58 @@
 #include "bh_util.h"
 #include "defs.h"
 
-namespace bhome_shm
-{
-
 using namespace std::chrono_literals;
-const int kMaxWorker = 16;
 using namespace bhome_msg;
 
-BusManager::BusManager(SharedMemory &shm) :
-    shm_(shm), socket_(ShmSocket::eSockBus, shm) {}
-BusManager::BusManager() :
-    BusManager(BHomeShm()) {}
-
-bool BusManager::Start(const int nworker)
+bool SocketPublish::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
 {
-	auto onRecv = [&](MsgI &imsg) {
-#ifndef NDEBUG
-		static std::atomic<time_t> last(0);
-		time_t now = 0;
-		time(&now);
-		if (last.exchange(now) < now) {
-			printf("bus queue size: %ld\n", socket_.Pending());
+	try {
+		MsgI imsg;
+		if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) {
+			return false;
 		}
-#endif
+		DEFER1(imsg.Release(shm()));
+		return ShmMsgQueue::Send(shm(), kBHBusQueueId, imsg, timeout_ms);
+	} catch (...) {
+		return false;
+	}
+}
 
-		BHMsg msg;
-		if (!imsg.Unpack(msg)) {
-			return;
-		}
+bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
+{
+	try {
+		return mq().Send(kBHBusQueueId, MakeSub(mq().Id(), topics), timeout_ms);
+	} catch (...) {
+		return false;
+	}
+}
 
-		auto OnSubChange = [&](auto &&update) {
-			DataSub sub;
-			if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
-				assert(sizeof(MQId) == msg.route(0).mq_id().size());
-				MQId client;
-				memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
-
-				std::lock_guard<std::mutex> guard(mutex_);
-				auto &topics = sub.topics();
-				for (auto &topic : topics) {
-					try {
-						update(topic, client);
-					} catch (...) {
-						//TODO log error
-					}
-				}
+bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
+{
+	auto AsyncRecvProc = [this, tdcb](BHMsg &msg) {
+		if (msg.type() == kMsgTypePublish) {
+			DataPub d;
+			if (d.ParseFromString(msg.body())) {
+				tdcb(d.topic(), d.data());
 			}
-		};
-
-		auto Sub1 = [this](const std::string &topic, const MQId &id) {
-			records_[topic].insert(id);
-		};
-
-		auto Unsub1 = [this](const std::string &topic, const MQId &id) {
-			auto pos = records_.find(topic);
-			if (pos != records_.end()) {
-				if (pos->second.erase(id) && pos->second.empty()) {
-					records_.erase(pos);
-				}
-			}
-		};
-
-		auto OnPublish = [&]() {
-			DataPub pub;
-			if (!pub.ParseFromString(msg.body())) {
-				return;
-			}
-			auto FindClients = [&](const std::string &topic) {
-				Clients dests;
-				std::lock_guard<std::mutex> guard(mutex_);
-				auto Find1 = [&](const std::string &t) {
-					auto pos = records_.find(topic);
-					if (pos != records_.end() && !pos->second.empty()) {
-						auto &clients = pos->second;
-						for (auto &cli : clients) {
-							dests.insert(cli);
-						}
-					}
-				};
-				Find1(topic);
-
-				//TODO check and adjust topic on client side sub/pub.
-				size_t pos = 0;
-				while (true) {
-					pos = topic.find(kTopicSep, pos);
-					if (pos == topic.npos || ++pos == topic.size()) {
-						// Find1(std::string()); // sub all.
-						break;
-					} else {
-						Find1(topic.substr(0, pos));
-					}
-				}
-				return dests;
-			};
-
-			auto Dispatch = [&](auto &&send1) {
-				const Clients &clients(FindClients(pub.topic()));
-				for (auto &cli : clients) {
-					send1(cli);
-				}
-			};
-
-			if (imsg.IsCounted()) {
-				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, imsg, 100); });
-			} else {
-				MsgI pubmsg;
-				if (!pubmsg.MakeRC(shm_, msg)) { return; }
-				DEFER1(pubmsg.Release(shm_));
-
-				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, pubmsg, 100); });
-			}
-		};
-
-		switch (msg.type()) {
-		case kMsgTypeSubscribe: OnSubChange(Sub1); break;
-		case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
-		case kMsgTypePublish: OnPublish(); break;
-		default: break;
+		} else {
+			// ignored, or dropped
 		}
 	};
 
-	return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+	return tdcb && Start(AsyncRecvProc, nworker);
 }
 
-} // namespace bhome_shm
+bool SocketSubscribe::RecvSub(std::string &topic, std::string &data, const int timeout_ms)
+{
+	BHMsg msg;
+	if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
+		DataPub d;
+		if (d.ParseFromString(msg.body())) {
+			d.mutable_topic()->swap(topic);
+			d.mutable_data()->swap(data);
+			return true;
+		}
+	}
+	return false;
+}
\ No newline at end of file
diff --git a/src/pubsub.h b/src/pubsub.h
index be6521f..cad9f61 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -18,31 +18,43 @@
 #ifndef PUBSUB_4KGRA997
 #define PUBSUB_4KGRA997
 
+#include "defs.h"
 #include "socket.h"
-#include <mutex>
-#include <set>
-#include <unordered_map>
+#include <string>
 
-namespace bhome_shm
+class SocketPublish
 {
-
-// publish/subcribe manager.
-class BusManager
-{
-	SharedMemory &shm_;
-	ShmSocket socket_;
-	std::mutex mutex_;
-	typedef std::set<MQId> Clients;
-	std::unordered_map<std::string, Clients> records_;
+	typedef ShmSocket Socket;
+	Socket::Shm &shm_;
+	Socket::Shm &shm() { return shm_; }
 
 public:
-	BusManager(SharedMemory &shm);
-	BusManager();
-	~BusManager() { Stop(); }
-	bool Start(const int nworker = 2);
-	bool Stop() { return socket_.Stop(); }
+	SocketPublish(Socket::Shm &shm) :
+	    shm_(shm) {}
+	SocketPublish() :
+	    SocketPublish(BHomeShm()) {}
+	bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
+	bool Publish(const std::string &topic, const std::string &data, const int timeout_ms)
+	{
+		return Publish(topic, data.data(), data.size(), timeout_ms);
+	}
 };
 
-} // namespace bhome_shm
+// socket subscribe
+class SocketSubscribe : private ShmSocket
+{
+	typedef ShmSocket Socket;
+
+public:
+	SocketSubscribe(Socket::Shm &shm) :
+	    Socket(shm, 64) {}
+	SocketSubscribe() :
+	    SocketSubscribe(BHomeShm()) {}
+
+	typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB;
+	bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
+	bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
+	bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
+};
 
 #endif // end of include guard: PUBSUB_4KGRA997
diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp
new file mode 100644
index 0000000..33c16be
--- /dev/null
+++ b/src/pubsub_center.cpp
@@ -0,0 +1,134 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  pubsub_center.cpp
+ *
+ *    Description:  pub/sub center/manager
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�01鏃� 09鏃�29鍒�04绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "pubsub_center.h"
+#include "bh_util.h"
+
+PubSubCenter::PubSubCenter(SharedMemory &shm) :
+    socket_(shm) {}
+
+bool PubSubCenter::Start(const int nworker)
+{
+	auto onRecv = [&](MsgI &imsg) {
+#ifndef NDEBUG
+		static std::atomic<time_t> last(0);
+		time_t now = 0;
+		time(&now);
+		if (last.exchange(now) < now) {
+			printf("bus queue size: %ld\n", socket_.Pending());
+		}
+#endif
+
+		BHMsg msg;
+		if (!imsg.Unpack(msg)) {
+			return;
+		}
+
+		auto OnSubChange = [&](auto &&update) {
+			DataSub sub;
+			if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
+				assert(sizeof(MQId) == msg.route(0).mq_id().size());
+				MQId client;
+				memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
+
+				std::lock_guard<std::mutex> guard(mutex_);
+				auto &topics = sub.topics();
+				for (auto &topic : topics) {
+					try {
+						update(topic, client);
+					} catch (...) {
+						//TODO log error
+					}
+				}
+			}
+		};
+
+		auto Sub1 = [this](const std::string &topic, const MQId &id) {
+			records_[topic].insert(id);
+		};
+
+		auto Unsub1 = [this](const std::string &topic, const MQId &id) {
+			auto pos = records_.find(topic);
+			if (pos != records_.end()) {
+				if (pos->second.erase(id) && pos->second.empty()) {
+					records_.erase(pos);
+				}
+			}
+		};
+
+		auto OnPublish = [&]() {
+			DataPub pub;
+			if (!pub.ParseFromString(msg.body())) {
+				return;
+			}
+			auto FindClients = [&](const std::string &topic) {
+				Clients dests;
+				std::lock_guard<std::mutex> guard(mutex_);
+				auto Find1 = [&](const std::string &t) {
+					auto pos = records_.find(topic);
+					if (pos != records_.end() && !pos->second.empty()) {
+						auto &clients = pos->second;
+						for (auto &cli : clients) {
+							dests.insert(cli);
+						}
+					}
+				};
+				Find1(topic);
+
+				//TODO check and adjust topic on client side sub/pub.
+				size_t pos = 0;
+				while (true) {
+					pos = topic.find(kTopicSep, pos);
+					if (pos == topic.npos || ++pos == topic.size()) {
+						// Find1(std::string()); // sub all.
+						break;
+					} else {
+						Find1(topic.substr(0, pos));
+					}
+				}
+				return dests;
+			};
+
+			auto Dispatch = [&](auto &&send1) {
+				const Clients &clients(FindClients(pub.topic()));
+				for (auto &cli : clients) {
+					send1(cli);
+				}
+			};
+
+			if (imsg.IsCounted()) {
+				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 100); });
+			} else {
+				MsgI pubmsg;
+				if (!pubmsg.MakeRC(shm(), msg)) { return; }
+				DEFER1(pubmsg.Release(shm()));
+
+				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 100); });
+			}
+		};
+
+		switch (msg.type()) {
+		case kMsgTypeSubscribe: OnSubChange(Sub1); break;
+		case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
+		case kMsgTypePublish: OnPublish(); break;
+		default: break;
+		}
+	};
+
+	const int kMaxWorker = 16;
+	return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+}
\ No newline at end of file
diff --git a/src/pubsub_center.h b/src/pubsub_center.h
new file mode 100644
index 0000000..866216e
--- /dev/null
+++ b/src/pubsub_center.h
@@ -0,0 +1,53 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  pubsub_center.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�01鏃� 09鏃�29鍒�39绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef PUBSUB_CENTER_MFSUZJU7
+#define PUBSUB_CENTER_MFSUZJU7
+
+#include "defs.h"
+#include "socket.h"
+#include <mutex>
+#include <set>
+#include <unordered_map>
+using namespace bhome_shm;
+
+// publish/subcribe manager.
+class PubSubCenter
+{
+	class SocketBus : public ShmSocket
+	{
+	public:
+		SocketBus(SharedMemory &shm) :
+		    ShmSocket(shm, &kBHBusQueueId, 1000) {}
+		using ShmSocket::shm;
+	};
+	SocketBus socket_;
+	std::mutex mutex_;
+	typedef std::set<MQId> Clients;
+	std::unordered_map<std::string, Clients> records_;
+	ShmSocket::Shm &shm() { return socket_.shm(); }
+
+public:
+	PubSubCenter(SharedMemory &shm);
+	PubSubCenter() :
+	    PubSubCenter(BHomeShm()) {}
+	~PubSubCenter() { Stop(); }
+	bool Start(const int nworker = 2);
+	bool Stop() { return socket_.Stop(); }
+};
+
+#endif // end of include guard: PUBSUB_CENTER_MFSUZJU7
diff --git a/src/reqrep.cpp b/src/reqrep.cpp
new file mode 100644
index 0000000..e1636fd
--- /dev/null
+++ b/src/reqrep.cpp
@@ -0,0 +1,164 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  reqrep.cpp
+ *
+ *    Description:  topic request/reply sockets
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�01鏃� 09鏃�35鍒�35绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "reqrep.h"
+#include "msg.h"
+#include <chrono>
+#include <condition_variable>
+
+using namespace bhome_msg;
+
+bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker)
+{
+	auto AsyncRecvProc = [this, rrcb](BHMsg &msg) {
+		auto Find = [&](RecvCB &cb) {
+			std::lock_guard<std::mutex> lock(mutex());
+			const std::string &msgid = msg.msg_id();
+			auto pos = async_cbs_.find(msgid);
+			if (pos != async_cbs_.end()) {
+				cb.swap(pos->second);
+				async_cbs_.erase(pos);
+				return true;
+			} else {
+				return false;
+			}
+		};
+
+		RecvCB cb;
+		if (Find(cb) && cb) {
+			cb(msg);
+		} else if (rrcb && msg.type() == kMsgTypeReply) {
+			DataReply reply;
+			if (reply.ParseFromString(msg.body())) {
+				rrcb(reply.data());
+			}
+		} else {
+			// ignored, or dropped
+		}
+	};
+
+	return Start(AsyncRecvProc, nworker);
+}
+
+bool SocketRequest::AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
+{
+	auto Call = [&](const void *remote) {
+		const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
+		auto onRecv = [cb](BHMsg &msg) {
+			if (msg.type() == kMsgTypeReply) {
+				DataReply reply;
+				if (reply.ParseFromString(msg.body())) {
+					cb(reply.data());
+				}
+			}
+		};
+		return AsyncSend(remote, &msg, timeout_ms, onRecv);
+	};
+
+	try {
+		BHAddress addr;
+		if (QueryRPCTopic(topic, addr, timeout_ms)) {
+			return Call(addr.mq_id().data());
+		}
+	} catch (...) {
+		return false;
+	}
+}
+
+bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
+{
+	try {
+		BHAddress addr;
+		if (QueryRPCTopic(topic, addr, timeout_ms)) {
+			const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
+			BHMsg reply;
+			if (SyncSendAndRecv(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
+				DataReply dr;
+				if (dr.ParseFromString(msg.body())) {
+					dr.mutable_data()->swap(out);
+					return true;
+				}
+			}
+		}
+	} catch (...) {
+		return false;
+	}
+}
+
+bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
+{
+	assert(remote && pmsg);
+	try {
+		const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
+		auto RegisterCB = [&]() {
+			std::lock_guard<std::mutex> lock(mutex());
+			async_cbs_.emplace(msg.msg_id(), cb);
+		};
+
+		return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB);
+	} catch (...) {
+		return false;
+	}
+}
+
+bool SocketRequest::SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms)
+{
+	struct State {
+		std::mutex mutex;
+		std::condition_variable cv;
+		bool canceled = false;
+	};
+
+	try {
+		std::shared_ptr<State> st(new State);
+		auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
+
+		auto OnRecv = [=](BHMsg &msg) {
+			std::unique_lock<std::mutex> lk(st->mutex);
+			if (!st->canceled) {
+				static_cast<BHMsg *>(result)->Swap(&msg);
+				st->cv.notify_one();
+			} // else result is no longer valid.
+		};
+
+		std::unique_lock<std::mutex> lk(st->mutex);
+		if (AsyncSend(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
+			return true;
+		} else {
+			st->canceled = true;
+			return false;
+		}
+	} catch (...) {
+		return false;
+	}
+}
+
+bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
+{
+	BHMsg result;
+	const BHMsg &msg = MakeQueryTopic(topic);
+	if (SyncSendAndRecv(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
+		if (result.type() == kMsgTypeQueryTopicReply) {
+			DataQueryTopicReply reply;
+			if (reply.ParseFromString(result.body())) {
+				addr = reply.address();
+				return !addr.mq_id().empty();
+			}
+		}
+	}
+	return false;
+}
diff --git a/src/reqrep.h b/src/reqrep.h
new file mode 100644
index 0000000..02cc86f
--- /dev/null
+++ b/src/reqrep.h
@@ -0,0 +1,57 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  reqrep.h
+ *
+ *    Description:  topic request/reply sockets
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�01鏃� 09鏃�36鍒�06绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef REQREP_ACEH09NK
+#define REQREP_ACEH09NK
+
+#include "defs.h"
+#include "socket.h"
+#include <functional>
+#include <unordered_map>
+
+class SocketRequest : private ShmSocket
+{
+	typedef ShmSocket Socket;
+
+public:
+	SocketRequest(Socket::Shm &shm) :
+	    Socket(shm, 64) { StartWorker(); }
+	SocketRequest() :
+	    SocketRequest(BHomeShm()) {}
+
+	typedef std::function<void(const std::string &data)> RequestResultCB;
+	bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
+	bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
+	bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
+	bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
+	{
+		return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
+	}
+	bool SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
+	bool SyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, std::string &out)
+	{
+		return SyncRequest(topic, data.data(), data.size(), timeout_ms, out);
+	}
+
+private:
+	bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
+	bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
+	bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
+	std::unordered_map<std::string, RecvCB> async_cbs_;
+};
+
+#endif // end of include guard: REQREP_ACEH09NK
diff --git a/src/socket.cpp b/src/socket.cpp
index 13f1e38..4c2fc6b 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -20,8 +20,6 @@
 #include "bh_util.h"
 #include "defs.h"
 #include "msg.h"
-#include <chrono>
-#include <condition_variable>
 
 using namespace bhome_msg;
 using namespace bhome_shm;
@@ -31,78 +29,33 @@
 
 } // namespace
 
-//TODO maybe change to base class, each type is a sub class.
-
-ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) :
-    shm_(shm), type_(type), run_(false)
+ShmSocket::ShmSocket(Shm &shm, const void *id, const int len) :
+    shm_(shm), run_(false)
 {
-	switch (type) {
-	case eSockBus: mq_.reset(new Queue(kBHBusQueueId, shm_, 1000)); break;
-	case eSockRequest: mq_.reset(new Queue(shm_, 12)); break;
-	case eSockReply: mq_.reset(new Queue(shm_, 64)); break;
-	case eSockSubscribe: mq_.reset(new Queue(shm_, 64)); break;
-	case eSockPublish: break; // no recv mq needed
-	default: break;
+	if (id && len > 0) {
+		mq_.reset(new Queue(*static_cast<const MQId *>(id), shm, len));
 	}
 }
-
-ShmSocket::ShmSocket(Type type) :
-    ShmSocket(type, BHomeShm()) {}
+ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
+    shm_(shm), run_(false)
+{
+	if (len > 0) {
+		mq_.reset(new Queue(shm_, len));
+	}
+}
 
 ShmSocket::~ShmSocket()
 {
 	Stop();
 }
 
-bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
-{
-	if (type_ != eSockPublish) {
-		return false;
-	}
-	assert(!mq_);
-	try {
-		MsgI imsg;
-		if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) {
-			return false;
-		}
-		DEFER1(imsg.Release(shm_));
-		return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms);
-	} catch (...) {
-		return false;
-	}
-}
-
-bool ShmSocket::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
-{
-	if (type_ != eSockSubscribe) {
-		return false;
-	}
-	assert(mq_);
-	try {
-		return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms);
-	} catch (...) {
-		return false;
-	}
-}
-
 bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker)
 {
-	auto CanRecv = [this]() {
-		switch (type_) {
-		case eSockRequest:
-		case eSockReply:
-		case eSockBus:
-		case eSockSubscribe:
-			return true;
-		default:
-			return false;
-		}
-	};
-	if (!CanRecv()) {
+	if (!mq_) {
 		return false;
 	}
-	std::lock_guard<std::mutex> lock(mutex_);
 
+	std::lock_guard<std::mutex> lock(mutex_);
 	StopNoLock();
 	auto RecvProc = [this, onData]() {
 		while (run_) {
@@ -127,31 +80,6 @@
 	return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker);
 }
 
-bool ShmSocket::StartAsync(int nworker)
-{
-	auto AsyncRecvProc = [this](BHMsg &msg) {
-		auto Find = [&](RecvCB &cb) {
-			std::lock_guard<std::mutex> lock(mutex_);
-			const std::string &msgid = msg.msg_id();
-			auto pos = async_cbs_.find(msgid);
-			if (pos != async_cbs_.end()) {
-				cb.swap(pos->second);
-				async_cbs_.erase(pos);
-				return true;
-			} else {
-				return false;
-			}
-		};
-
-		RecvCB cb;
-		if (Find(cb) && cb) {
-			cb(msg);
-		}
-	};
-
-	return Start(AsyncRecvProc, nworker);
-}
-
 bool ShmSocket::Stop()
 {
 	std::lock_guard<std::mutex> lock(mutex_);
@@ -166,118 +94,28 @@
 				w.join();
 			}
 		}
+		workers_.clear();
 		return true;
 	}
 	return false;
 }
 
-bool ShmSocket::AsyncRequest(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
+bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms)
 {
-	if (type_ != eSockRequest) {
+	std::lock_guard<std::mutex> lock(mutex_);
+	if (!mq_ || RunningNoLock()) {
 		return false;
-	}
-	assert(remote && pmsg && !mq_);
-	try {
-		const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
-		auto RegisterCB = [&]() {
-			std::lock_guard<std::mutex> lock(mutex_);
-			async_cbs_.emplace(msg.msg_id(), cb);
-		};
-
-		return mq_->Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB);
-	} catch (...) {
-		return false;
+	} else {
+		return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
 	}
 }
 
-bool ShmSocket::SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms)
+bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms)
 {
-	struct State {
-		std::mutex mutex;
-		std::condition_variable cv;
-		bool canceled = false;
-	};
-
-	try {
-		std::shared_ptr<State> st(new State);
-		auto OnRecv = [=](BHMsg &msg) {
-			std::unique_lock<std::mutex> lk(st->mutex);
-			if (!st->canceled) {
-				static_cast<BHMsg *>(result)->Swap(&msg);
-				st->cv.notify_one();
-			}
-		};
-
-		std::unique_lock<std::mutex> lk(st->mutex);
-		auto end = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
-		if (AsyncRequest(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, end) == std::cv_status::no_timeout) {
-			return true;
-		} else {
-			st->canceled = true;
-			return false;
-		}
-	} catch (...) {
+	std::lock_guard<std::mutex> lock(mutex_);
+	if (!mq_ || RunningNoLock()) {
 		return false;
-	}
-}
-
-bool ShmSocket::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
-{
-	BHMsg result;
-	const BHMsg &msg = MakeQueryTopic(topic);
-	if (SyncRequest(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
-		if (result.type() == kMsgTypeQueryTopicReply) {
-			DataQueryTopicReply reply;
-			if (reply.ParseFromString(result.body())) {
-				addr = reply.address();
-				return !addr.mq_id().empty();
-			}
-		}
-	}
-	return false;
-}
-
-bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
-{
-	auto Call = [&](const void *remote) {
-		const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size));
-		auto onRecv = [cb](BHMsg &msg) {
-			if (msg.type() == kMsgTypeReply) {
-				DataReply reply;
-				if (reply.ParseFromString(msg.body())) {
-					cb(reply.data().data(), reply.data().size());
-				}
-			}
-		};
-		return AsyncRequest(remote, &msg, timeout_ms, onRecv);
-	};
-
-	try {
-		BHAddress addr;
-		if (QueryRPCTopic(topic, addr, timeout_ms)) {
-			return Call(addr.mq_id().data());
-		}
-	} catch (...) {
-		return false;
-	}
-}
-
-bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
-{
-	try {
-		BHAddress addr;
-		if (QueryRPCTopic(topic, addr, timeout_ms)) {
-			const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size));
-			BHMsg reply;
-			if (SyncRequest(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
-				DataReply dr;
-				if (dr.ParseFromString(msg.body())) {
-					dr.mutable_data()->swap(out);
-					return true;
-				}
-			}
-		}
-	} catch (...) {
-		return false;
+	} else {
+		return mq_->Recv(msg, timeout_ms);
 	}
 }
diff --git a/src/socket.h b/src/socket.h
index eee5b5b..20da7c0 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -26,59 +26,48 @@
 #include <memory>
 #include <mutex>
 #include <thread>
-#include <unordered_map>
 #include <vector>
 
 class ShmSocket : private boost::noncopyable
 {
+protected:
 	typedef bhome_shm::ShmMsgQueue Queue;
 
 public:
-	enum Type {
-		eSockRequest,
-		eSockReply,
-		eSockSubscribe,
-		eSockPublish,
-		eSockBus,
-	};
+	typedef bhome_shm::SharedMemory Shm;
 	typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
 	typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB;
-	typedef std::function<void(const void *data, const size_t size)> RequestResultCB;
 
-	ShmSocket(Type type, bhome_shm::SharedMemory &shm);
-	ShmSocket(Type type);
+	ShmSocket(Shm &shm, const int len = 12);
 	~ShmSocket();
-	bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
-	bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
-
-	// bool HandleRequest(onData);
-	bool ReadRequest(); // exclude with HandleRequest
-	bool SendReply();   // exclude with HandleRequest
-
-	bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
-	bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
-	bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
 
 	// start recv.
 	bool Start(const RecvCB &onData, int nworker = 1);
 	bool StartRaw(const RecvRawCB &onData, int nworker = 1);
-	bool StartAsync(int nworker = 2);
 	bool Stop();
 	size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
 
+protected:
+	ShmSocket(Shm &shm, const void *id, const int len);
+	Shm &shm() { return shm_; }
+	const Shm &shm() const { return shm_; }
+	Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
+	const Queue &mq() const { return *mq_; }
+	std::mutex &mutex() { return mutex_; }
+
+	bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
+	bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
+
 private:
-	bool AsyncRequest(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
-	bool SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms);
-	bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
 	bool StopNoLock();
-	bhome_shm::SharedMemory &shm_;
-	const Type type_;
+	bool RunningNoLock() { return !workers_.empty(); }
+
+	Shm &shm_;
 	std::vector<std::thread> workers_;
 	std::mutex mutex_;
 	std::atomic<bool> run_;
 
 	std::unique_ptr<Queue> mq_;
-	std::unordered_map<std::string, RecvCB> async_cbs_;
 };
 
 #endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/utest/utest.cpp b/utest/utest.cpp
index fbe4d51..b95e646 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,5 +1,6 @@
 #include "defs.h"
 #include "pubsub.h"
+#include "pubsub_center.h"
 #include "socket.h"
 #include "util.h"
 #include <atomic>
@@ -66,7 +67,7 @@
 BOOST_AUTO_TEST_CASE(PubSubTest)
 {
 	const std::string shm_name("ShmPubSub");
-	// ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
+	ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
 	SharedMemory shm(shm_name, 1024 * 1024 * 50);
 	DEFER1(shm.Remove());
 	auto Avail = [&]() { return shm.get_free_memory(); };
@@ -75,57 +76,50 @@
 	printf("flag = %d\n", *flag);
 	++*flag;
 
-	BusManager bus(shm);
+	PubSubCenter bus(shm);
 	bus.Start();
+
 	std::this_thread::sleep_for(100ms);
 
-	std::atomic<uint64_t> count(0);
+	std::atomic<uint64_t> total_count(0);
 	std::atomic<ptime> last_time(Now() - seconds(1));
 	std::atomic<uint64_t> last_count(0);
 
 	const uint64_t nmsg = 100 * 2;
 	const int timeout = 1000;
 	auto Sub = [&](int id, const std::vector<std::string> &topics) {
-		ShmSocket client(ShmSocket::eSockSubscribe, shm);
+		SocketSubscribe client(shm);
 		bool r = client.Subscribe(topics, timeout);
 		std::mutex mutex;
 		std::condition_variable cv;
 
-		uint64_t i = 0;
-		auto OnRecv = [&](BHMsg &msg) {
-			if (msg.type() != kMsgTypePublish) {
-				BOOST_CHECK(false);
-			}
-			DataPub pub;
-			if (!pub.ParseFromString(msg.body())) {
-				BOOST_CHECK(false);
-			}
-			++count;
+		std::atomic<uint64_t> n(0);
+		auto OnTopicData = [&](const std::string &topic, const std::string &data) {
+			++total_count;
 
 			auto cur = Now();
 			if (last_time.exchange(cur) < cur) {
 				std::cout << "time: " << cur;
 				printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
-				       count.load(), count - last_count.exchange(count), init_avail - Avail());
+				       total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
 			}
-			if (++i >= nmsg * topics.size()) {
+			if (++n >= nmsg * topics.size()) {
 				cv.notify_one();
 			}
 			// printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
 		};
-		client.Start(OnRecv);
+		client.StartRecv(OnTopicData, 1);
 
 		std::unique_lock<std::mutex> lk(mutex);
 		cv.wait(lk);
 	};
 
 	auto Pub = [&](const std::string &topic) {
-		ShmSocket provider(ShmSocket::eSockPublish, shm);
+		SocketPublish provider(shm);
 		for (unsigned i = 0; i < nmsg; ++i) {
 			std::string data = topic + std::to_string(i) + std::string(1000, '-');
 
-			bool r = provider.Publish(topic, data.data(), data.size(), timeout);
-			// bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
+			bool r = provider.Publish(topic, data, timeout);
 			if (!r) {
 				printf("pub ret: %s\n", r ? "ok" : "fail");
 			}
@@ -151,7 +145,7 @@
 	threads.WaitAll();
 	std::cout << "end : " << Now();
 	printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
-	       count.load(), count - last_count.exchange(count), init_avail - Avail());
+	       total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
 
 	bus.Stop();
 }

--
Gitblit v1.8.0