From 83085f2ce99cca05d40a19482151873a55e6393a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 02 四月 2021 19:32:21 +0800
Subject: [PATCH] refactor center; add async request no cb.

---
 src/socket.h          |   13 +-
 src/pubsub_center.h   |   15 +--
 src/reqrep_center.cpp |   54 +++++++---
 src/reqrep.h          |   11 +
 src/reqrep_center.h   |   14 --
 src/socket.cpp        |   14 +-
 src/center.h          |   16 +++
 src/pubsub_center.cpp |   37 +++---
 utest/utest.cpp       |   21 +++
 src/reqrep.cpp        |   34 +++++-
 src/center.cpp        |   13 ++
 11 files changed, 165 insertions(+), 77 deletions(-)

diff --git a/src/center.cpp b/src/center.cpp
index db000c4..d6570aa 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -17,6 +17,8 @@
  */
 #include "center.h"
 #include "defs.h"
+#include "pubsub_center.h"
+#include "reqrep_center.h"
 #include "shm.h"
 
 using namespace bhome_shm;
@@ -26,3 +28,14 @@
 	static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64);
 	return shm;
 }
+
+BHCenter::BHCenter(Socket::Shm &shm) :
+    socket_(shm) {}
+
+BHCenter::BHCenter() :
+    BHCenter(BHomeShm()) {}
+
+bool BHCenter::Start()
+{
+	return false;
+}
\ No newline at end of file
diff --git a/src/center.h b/src/center.h
index 153cc3e..f0a177c 100644
--- a/src/center.h
+++ b/src/center.h
@@ -18,8 +18,24 @@
 #ifndef CENTER_TM9OUQTG
 #define CENTER_TM9OUQTG
 
+#include "socket.h"
+#include <functional>
+
 class BHCenter
 {
+	typedef ShmSocket Socket;
+
+public:
+	typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler;
+
+	BHCenter(Socket::Shm &shm);
+	BHCenter();
+	~BHCenter() { Stop(); }
+	bool Start();
+	bool Stop() { return socket_.Stop(); }
+
+private:
+	ShmSocket socket_;
 };
 
 #endif // end of include guard: CENTER_TM9OUQTG
diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp
index 3ba5382..b3af47d 100644
--- a/src/pubsub_center.cpp
+++ b/src/pubsub_center.cpp
@@ -77,25 +77,21 @@
 
 } // namespace
 
-bool PubSubCenter::Start(const int nworker)
+BHCenter::MsgHandler MakeBusCenter()
 {
 	auto bus_ptr = std::make_shared<Synced<BusCenter>>();
 
-	auto onRecv = [bus_ptr, this](MsgI &imsg) {
+	return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
 #ifndef NDEBUG
 		static std::atomic<time_t> last(0);
 		time_t now = 0;
 		time(&now);
 		if (last.exchange(now) < now) {
-			printf("bus queue size: %ld\n", socket_.Pending());
+			printf("bus queue size: %ld\n", socket.Pending());
 		}
 #endif
 		auto &bus = *bus_ptr;
-
-		BHMsg msg;
-		if (!imsg.Unpack(msg)) {
-			return;
-		}
+		auto &shm = socket.shm();
 
 		auto OnSubChange = [&](auto &&update) {
 			DataSub sub;
@@ -106,7 +102,6 @@
 				update(client, sub.topics());
 			}
 		};
-
 		auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); };
 		auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
 
@@ -123,24 +118,30 @@
 			};
 
 			if (imsg.IsCounted()) {
-				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); });
+				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); });
 			} else {
 				MsgI pubmsg;
-				if (!pubmsg.MakeRC(shm(), msg)) { return; }
-				DEFER1(pubmsg.Release(shm()));
+				if (!pubmsg.MakeRC(shm, msg)) { return; }
+				DEFER1(pubmsg.Release(shm));
 
-				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); });
+				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); });
 			}
 		};
 
 		switch (msg.type()) {
-		case kMsgTypeSubscribe: OnSubChange(Sub); break;
-		case kMsgTypeUnsubscribe: OnSubChange(Unsub); break;
-		case kMsgTypePublish: OnPublish(); break;
-		default: break;
+		case kMsgTypeSubscribe: OnSubChange(Sub); return true;
+		case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true;
+		case kMsgTypePublish: OnPublish(); return true;
+		default: return false;
 		}
 	};
+}
+
+bool PubSubCenter::Start(const int nworker)
+{
+	auto handler = MakeBusCenter();
+	printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler));
 
 	const int kMaxWorker = 16;
-	return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+	return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
 }
\ No newline at end of file
diff --git a/src/pubsub_center.h b/src/pubsub_center.h
index af3a2f4..e79dd96 100644
--- a/src/pubsub_center.h
+++ b/src/pubsub_center.h
@@ -18,28 +18,23 @@
 #ifndef PUBSUB_CENTER_MFSUZJU7
 #define PUBSUB_CENTER_MFSUZJU7
 
+#include "center.h"
 #include "defs.h"
 #include "socket.h"
 #include <mutex>
 #include <set>
 #include <unordered_map>
 
+BHCenter::MsgHandler MakeBusCenter();
+
 // publish/subcribe manager.
 class PubSubCenter
 {
-	class SocketBus : public ShmSocket
-	{
-	public:
-		SocketBus(ShmSocket::Shm &shm) :
-		    ShmSocket(shm, &kBHTopicBus, 1000) {}
-		using ShmSocket::shm;
-	};
-	SocketBus socket_;
-	ShmSocket::Shm &shm() { return socket_.shm(); }
+	ShmSocket socket_;
 
 public:
 	PubSubCenter(ShmSocket::Shm &shm) :
-	    socket_(shm) {}
+	    socket_(shm, &kBHTopicBus, 1000) {}
 	PubSubCenter() :
 	    PubSubCenter(BHomeShm()) {}
 	~PubSubCenter() { Stop(); }
diff --git a/src/reqrep.cpp b/src/reqrep.cpp
index b8e423b..25c0826 100644
--- a/src/reqrep.cpp
+++ b/src/reqrep.cpp
@@ -26,7 +26,7 @@
 bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker)
 {
 	auto AsyncRecvProc = [this, rrcb](BHMsg &msg) {
-		auto Find = [&](RecvCB &cb) {
+		auto Find = [&](RecvBHMsgCB &cb) {
 			std::lock_guard<std::mutex> lock(mutex());
 			const std::string &msgid = msg.msg_id();
 			auto pos = async_cbs_.find(msgid);
@@ -39,10 +39,10 @@
 			}
 		};
 
-		RecvCB cb;
-		if (Find(cb) && cb) {
+		RecvBHMsgCB cb;
+		if (Find(cb)) {
 			cb(msg);
-		} else if (rrcb && msg.type() == kMsgTypeReply) {
+		} else if (msg.type() == kMsgTypeReply) {
 			DataReply reply;
 			if (reply.ParseFromString(msg.body())) {
 				rrcb(reply.data());
@@ -55,6 +55,20 @@
 	return Start(AsyncRecvProc, nworker);
 }
 
+bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
+{
+	try {
+		BHAddress addr;
+		if (QueryRPCTopic(topic, addr, timeout_ms)) {
+			const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
+			return AsyncSend(addr.mq_id().data(), &msg, timeout_ms);
+		} else {
+			return false;
+		}
+	} catch (...) {
+		return false;
+	}
+}
 bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
 {
 	auto Call = [&](const void *remote) {
@@ -103,7 +117,17 @@
 	return false;
 }
 
-bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
+bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms)
+{
+	assert(remote && pmsg);
+	try {
+		const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
+		return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms);
+	} catch (...) {
+		return false;
+	}
+}
+bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb)
 {
 	assert(remote && pmsg);
 	try {
diff --git a/src/reqrep.h b/src/reqrep.h
index 9e43c7b..8a4743c 100644
--- a/src/reqrep.h
+++ b/src/reqrep.h
@@ -43,9 +43,15 @@
 	bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
 	bool Stop() { return Socket::Stop(); }
 	bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
+	bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
+
 	bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
 	{
 		return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
+	}
+	bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms)
+	{
+		return AsyncRequest(topic, data.data(), data.size(), timeout_ms);
 	}
 	bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
 	bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
@@ -54,10 +60,11 @@
 	}
 
 private:
-	bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
+	bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvBHMsgCB &cb);
+	bool AsyncSend(const void *remote, const void *msg, const int timeout_ms);
 	bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
 	bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
-	std::unordered_map<std::string, RecvCB> async_cbs_;
+	std::unordered_map<std::string, RecvBHMsgCB> async_cbs_;
 
 	typedef bhome_msg::BHAddress Address;
 	class TopicCache
diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
index 2356ebc..e52b0fd 100644
--- a/src/reqrep_center.cpp
+++ b/src/reqrep_center.cpp
@@ -99,63 +99,81 @@
 	std::unordered_map<Topic, WeakNode> topic_map_;
 	std::unordered_map<ProcId, Node> nodes_;
 };
+
+Synced<NodeCenter> &Center()
+{
+	static Synced<NodeCenter> s;
+	return s;
+}
+
 } // namespace
 
-bool ReqRepCenter::Start(const int nworker)
+BHCenter::MsgHandler MakeReqRepCenter()
 {
 	auto center_ptr = std::make_shared<Synced<NodeCenter>>();
-	auto onRecv = [center_ptr, this](BHMsg &msg) {
+	return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
 		auto &center = *center_ptr;
+		auto &shm = socket.shm();
 
 #ifndef NDEBUG
 		static std::atomic<time_t> last(0);
 		time_t now = 0;
 		time(&now);
 		if (last.exchange(now) < now) {
-			printf("bus queue size: %ld\n", socket_.Pending());
+			printf("bus queue size: %ld\n", socket.Pending());
 		}
 #endif
-		if (msg.route_size() == 0) {
-			return;
-		}
-		auto &src_mq = msg.route(0).mq_id();
+		auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
 
 		auto OnRegister = [&]() {
+			if (msg.route_size() != 1) { return; }
+
 			DataProcRegister reg;
 			if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
-				center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end());
+				center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
 			}
 		};
 
 		auto OnHeartbeat = [&]() {
+			if (msg.route_size() != 1) { return; }
+			auto &src_mq = msg.route(0).mq_id();
+
 			DataProcHeartbeat hb;
 			if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
-				center->Heartbeat(*hb.mutable_proc(), src_mq);
+				center->Heartbeat(*hb.mutable_proc(), SrcMQ());
 			}
 		};
 
 		auto OnQueryTopic = [&]() {
+			if (msg.route_size() != 1) { return; }
+
 			DataProcQueryTopic query;
 			NodeCenter::ProcAddr dest;
 			if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
 				MQId remote;
-				memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
+				memcpy(&remote, SrcMQ().data(), sizeof(MQId));
 				MsgI imsg;
-				if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
-				if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) {
-					imsg.Release(shm());
+				if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
+				if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) {
+					imsg.Release(shm);
 				}
 			}
 		};
 
 		switch (msg.type()) {
-		case kMsgTypeProcRegisterTopics: OnRegister(); break;
-		case kMsgTypeProcHeartbeat: OnHeartbeat(); break;
-		case kMsgTypeProcQueryTopic: OnQueryTopic(); break;
-		default: break;
+		case kMsgTypeProcRegisterTopics: OnRegister(); return true;
+		case kMsgTypeProcHeartbeat: OnHeartbeat(); return true;
+		case kMsgTypeProcQueryTopic: OnQueryTopic(); return true;
+		default: return false;
 		}
 	};
+}
+
+bool ReqRepCenter::Start(const int nworker)
+{
+	auto handler = MakeReqRepCenter();
+	printf("sizeof(rep/req handler) = %ld\n", sizeof(handler));
 
 	const int kMaxWorker = 16;
-	return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+	return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
 }
\ No newline at end of file
diff --git a/src/reqrep_center.h b/src/reqrep_center.h
index 6473841..326ac7a 100644
--- a/src/reqrep_center.h
+++ b/src/reqrep_center.h
@@ -18,24 +18,18 @@
 #ifndef REQREP_CENTER_US3RBM60
 #define REQREP_CENTER_US3RBM60
 
+#include "center.h"
 #include "defs.h"
 #include "socket.h"
 
+BHCenter::MsgHandler MakeReqRepCenter();
 class ReqRepCenter
 {
-	class Socket : public ShmSocket
-	{
-	public:
-		Socket(ShmSocket::Shm &shm) :
-		    ShmSocket(shm, &kBHTopicReqRepCenter, 1000) {}
-		using ShmSocket::shm;
-	};
-	Socket socket_;
-	ShmSocket::Shm &shm() { return socket_.shm(); }
+	ShmSocket socket_;
 
 public:
 	ReqRepCenter(ShmSocket::Shm &shm) :
-	    socket_(shm) {}
+	    socket_(shm, &kBHTopicReqRepCenter, 1000) {}
 	ReqRepCenter() :
 	    ReqRepCenter(BHomeShm()) {}
 	~ReqRepCenter() { Stop(); }
diff --git a/src/socket.cpp b/src/socket.cpp
index b9519be..73681f1 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -49,7 +49,7 @@
 	Stop(); //TODO should stop in sub class, incase thread access sub class data.
 }
 
-bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker)
+bool ShmSocket::Start(const RecvCB &onData, int nworker)
 {
 	if (!mq_) {
 		return false;
@@ -62,7 +62,12 @@
 			try {
 				MsgI imsg;
 				DEFER1(imsg.Release(shm_));
-				if (mq_->Recv(imsg, 100)) { onData(imsg); }
+				if (mq_->Recv(imsg, 100)) {
+					BHMsg msg;
+					if (imsg.Unpack(msg)) {
+						onData(*this, imsg, msg);
+					}
+				}
 			} catch (...) {
 			}
 		}
@@ -73,11 +78,6 @@
 		workers_.emplace_back(RecvProc);
 	}
 	return true;
-}
-
-bool ShmSocket::Start(const RecvCB &onData, int nworker)
-{
-	return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker);
 }
 
 bool ShmSocket::Stop()
diff --git a/src/socket.h b/src/socket.h
index 20da7c0..1a3d47b 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -35,21 +35,24 @@
 
 public:
 	typedef bhome_shm::SharedMemory Shm;
-	typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
-	typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB;
+	typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB;
+	typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB;
 
+	ShmSocket(Shm &shm, const void *id, const int len);
 	ShmSocket(Shm &shm, const int len = 12);
 	~ShmSocket();
 
+	Shm &shm() { return shm_; }
 	// start recv.
 	bool Start(const RecvCB &onData, int nworker = 1);
-	bool StartRaw(const RecvRawCB &onData, int nworker = 1);
+	bool Start(const RecvBHMsgCB &onData, int nworker = 1)
+	{
+		return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, nworker);
+	}
 	bool Stop();
 	size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
 
 protected:
-	ShmSocket(Shm &shm, const void *id, const int len);
-	Shm &shm() { return shm_; }
 	const Shm &shm() const { return shm_; }
 	Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
 	const Queue &mq() const { return *mq_; }
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 637ae26..55a08a3 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -181,14 +181,31 @@
 
 	auto Client = [&](const std::string &topic, const int nreq) {
 		SocketRequest client(shm);
+		std::atomic<int> count(0);
 		std::string reply;
+		auto onRecv = [&](const std::string &rep) {
+			reply = rep;
+			if (++count >= nreq) {
+				printf("count: %d\n", count.load());
+			}
+		};
+		client.StartWorker(onRecv, 1);
 		boost::timer::auto_cpu_timer timer;
 		for (int i = 0; i < nreq; ++i) {
-			if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
+			if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) {
 				printf("client request failed\n");
 			}
+			// if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
+			// 	printf("client request failed\n");
+			// } else {
+			// 	++count;
+			// }
 		}
 		printf("request %s %d done ", topic.c_str(), nreq);
+		while (count.load() < nreq) {
+			std::this_thread::yield();
+		}
+		client.Stop();
 	};
 	auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
 		SocketReply server(shm);
@@ -212,7 +229,7 @@
 	servers.Launch(Server, "server", topics);
 	std::this_thread::sleep_for(100ms);
 	for (auto &t : topics) {
-		clients.Launch(Client, t, 1000 * 100);
+		clients.Launch(Client, t, 1000 * 1000);
 	}
 	clients.WaitAll();
 	run = false;

--
Gitblit v1.8.0