From 3c2b6739208d961cf8b86460d7f05516d044960c Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 31 三月 2021 19:13:42 +0800
Subject: [PATCH] add async recv suport; sync by waiting for async.

---
 src/topic_rpc.cpp            |   21 ++++
 src/socket.h                 |   11 +
 src/msg.h                    |    2 
 src/shm_queue.h              |   17 +++
 src/socket.cpp               |  140 ++++++++++++++++++++++++++++
 src/topic_rpc.h              |   31 ++++++
 proto/source/bhome_msg.proto |   19 +++
 src/defs.h                   |    1 
 src/shm_queue.cpp            |   36 +-----
 src/msg.cpp                  |   21 ++++
 10 files changed, 266 insertions(+), 33 deletions(-)

diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index 8365d2f..a88780b 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -25,6 +25,8 @@
 	kMsgTypePublish = 3;
 	kMsgTypeSubscribe = 4;
 	kMsgTypeUnsubscribe = 5;
+	kMsgTypeQueryTopic = 6;
+	kMsgTypeQueryTopicReply = 7;
 }
 
 message DataPub {
@@ -35,3 +37,20 @@
 message DataSub {
 	repeated bytes topics = 1;
 }
+
+message DataRequest {
+	bytes topic = 1;
+	bytes data = 2; 
+}
+
+message DataReply {
+	bytes data = 1; 
+}
+
+message DataQueryTopic {
+	bytes topic = 1;
+}
+
+message DataQueryTopicReply {
+	BHAddress address = 1;
+}
diff --git a/src/defs.h b/src/defs.h
index 10ac73c..fcdcc70 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -25,6 +25,7 @@
 typedef boost::uuids::uuid MQId;
 
 const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
+const MQId kBHTopicRPCId = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
 const int kBHCenterPort = 24287;
 const char kTopicSep = '.';
 namespace bhome_shm
diff --git a/src/msg.cpp b/src/msg.cpp
index 3a01240..41dd459 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -41,12 +41,22 @@
 	msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
 	return msg;
 }
+BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
+{
+	DataRequest req;
+	req.set_topic(topic);
+	req.set_data(data, size);
+	const std::string &body(req.SerializeAsString());
+	return MakeRequest(src_id, body.data(), body.size());
+}
 
 BHMsg MakeReply(const void *data, const size_t size)
 {
 	assert(data && size);
 	BHMsg msg(InitMsg(kMsgTypeReply));
-	msg.set_body(data, size);
+	DataReply reply;
+	reply.set_data(data, size);
+	msg.set_body(reply.SerializeAsString());
 	return msg;
 }
 
@@ -77,6 +87,15 @@
 	return msg;
 }
 
+BHMsg MakeQueryTopic(const std::string &topic)
+{
+	BHMsg msg(InitMsg(kMsgTypeQueryTopic));
+	DataQueryTopic query;
+	query.set_topic(topic);
+	msg.set_body(query.SerializeAsString());
+	return msg;
+}
+
 void *Pack(SharedMemory &shm, const BHMsg &msg)
 {
 	uint32_t msg_size = msg.ByteSizeLong();
diff --git a/src/msg.h b/src/msg.h
index 2154eba..ea1e636 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -59,7 +59,9 @@
 	int num_ = 1;
 };
 
+BHMsg MakeQueryTopic(const std::string &topic);
 BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size);
+BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size);
 BHMsg MakeReply(const void *data, const size_t size);
 BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics);
 BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index cf4c8b4..f2fb45b 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -64,23 +64,22 @@
 	Remove();
 }
 
-bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
+bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
 {
 	Queue *remote = Find(shm, MsgQIdToName(remote_id));
-	return remote && remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
+	return remote && remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
 }
 
-// bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
-// {
-//     Queue *remote = Find(MsgQIdToName(remote_id));
-//     return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();});
-// }
+// Test shows that in the 2 cases:
+// 1) build msg first, then find remote queue;
+// 2) find remote queue first, then build msg;
+// 1 is about 50% faster than 2, maybe cache related.
 
-bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms)
+bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, const std::function<void()> &onsend)
 {
 	MsgI msg;
 	if (msg.Make(shm(), data)) {
-		if (Send(remote_id, msg, timeout_ms)) {
+		if (Send(remote_id, msg, timeout_ms, onsend)) {
 			return true;
 		} else {
 			msg.Release(shm());
@@ -89,25 +88,6 @@
 	return false;
 }
 
-/*
-bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms)
-{
-    // Test shows that in the 2 cases:
-    // 1) build msg first, then find remote queue;
-    // 2) find remote queue first, then build msg;
-    // 1 is about 50% faster than 2, maybe cache related.
-
-    MsgI msg;
-    if(msg.BuildRequest(shm(), Id(), data, size)) {
-        if(Send(remote_id, msg, timeout_ms)) {
-            return true;
-        } else {
-            msg.Release(shm());
-        }
-    }
-    return false;
-}
-//*/
 bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms)
 {
 	MsgI imsg;
diff --git a/src/shm_queue.h b/src/shm_queue.h
index f1e67f3..e9b3a1a 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -118,6 +118,7 @@
 {
 	typedef ShmObject<SharedQueue<MsgI>> Super;
 	typedef Super::Data Queue;
+	typedef std::function<void()> OnSend;
 	bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
 	bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
 	MQId id_;
@@ -132,8 +133,20 @@
 
 	bool Recv(BHMsg &msg, const int timeout_ms);
 	bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
-	bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms);
-	static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
+	static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend);
+	static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
+	{
+		return Send(shm, remote_id, msg, timeout_ms, []() {});
+	}
+	bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms, OnSend const &onsend);
+	bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms)
+	{
+		return Send(remote_id, msg, timeout_ms, []() {});
+	}
+	bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
+	{
+		return Send(shm(), remote_id, msg, timeout_ms, onsend);
+	}
 	bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
 	{
 		return Send(shm(), remote_id, msg, timeout_ms);
diff --git a/src/socket.cpp b/src/socket.cpp
index 4d9fcc9..13f1e38 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -20,6 +20,8 @@
 #include "bh_util.h"
 #include "defs.h"
 #include "msg.h"
+#include <chrono>
+#include <condition_variable>
 
 using namespace bhome_msg;
 using namespace bhome_shm;
@@ -28,6 +30,8 @@
 {
 
 } // namespace
+
+//TODO maybe change to base class, each type is a sub class.
 
 ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) :
     shm_(shm), type_(type), run_(false)
@@ -123,6 +127,31 @@
 	return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker);
 }
 
+bool ShmSocket::StartAsync(int nworker)
+{
+	auto AsyncRecvProc = [this](BHMsg &msg) {
+		auto Find = [&](RecvCB &cb) {
+			std::lock_guard<std::mutex> lock(mutex_);
+			const std::string &msgid = msg.msg_id();
+			auto pos = async_cbs_.find(msgid);
+			if (pos != async_cbs_.end()) {
+				cb.swap(pos->second);
+				async_cbs_.erase(pos);
+				return true;
+			} else {
+				return false;
+			}
+		};
+
+		RecvCB cb;
+		if (Find(cb) && cb) {
+			cb(msg);
+		}
+	};
+
+	return Start(AsyncRecvProc, nworker);
+}
+
 bool ShmSocket::Stop()
 {
 	std::lock_guard<std::mutex> lock(mutex_);
@@ -141,3 +170,114 @@
 	}
 	return false;
 }
+
+bool ShmSocket::AsyncRequest(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
+{
+	if (type_ != eSockRequest) {
+		return false;
+	}
+	assert(remote && pmsg && !mq_);
+	try {
+		const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
+		auto RegisterCB = [&]() {
+			std::lock_guard<std::mutex> lock(mutex_);
+			async_cbs_.emplace(msg.msg_id(), cb);
+		};
+
+		return mq_->Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB);
+	} catch (...) {
+		return false;
+	}
+}
+
+bool ShmSocket::SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms)
+{
+	struct State {
+		std::mutex mutex;
+		std::condition_variable cv;
+		bool canceled = false;
+	};
+
+	try {
+		std::shared_ptr<State> st(new State);
+		auto OnRecv = [=](BHMsg &msg) {
+			std::unique_lock<std::mutex> lk(st->mutex);
+			if (!st->canceled) {
+				static_cast<BHMsg *>(result)->Swap(&msg);
+				st->cv.notify_one();
+			}
+		};
+
+		std::unique_lock<std::mutex> lk(st->mutex);
+		auto end = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
+		if (AsyncRequest(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, end) == std::cv_status::no_timeout) {
+			return true;
+		} else {
+			st->canceled = true;
+			return false;
+		}
+	} catch (...) {
+		return false;
+	}
+}
+
+bool ShmSocket::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
+{
+	BHMsg result;
+	const BHMsg &msg = MakeQueryTopic(topic);
+	if (SyncRequest(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
+		if (result.type() == kMsgTypeQueryTopicReply) {
+			DataQueryTopicReply reply;
+			if (reply.ParseFromString(result.body())) {
+				addr = reply.address();
+				return !addr.mq_id().empty();
+			}
+		}
+	}
+	return false;
+}
+
+bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
+{
+	auto Call = [&](const void *remote) {
+		const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size));
+		auto onRecv = [cb](BHMsg &msg) {
+			if (msg.type() == kMsgTypeReply) {
+				DataReply reply;
+				if (reply.ParseFromString(msg.body())) {
+					cb(reply.data().data(), reply.data().size());
+				}
+			}
+		};
+		return AsyncRequest(remote, &msg, timeout_ms, onRecv);
+	};
+
+	try {
+		BHAddress addr;
+		if (QueryRPCTopic(topic, addr, timeout_ms)) {
+			return Call(addr.mq_id().data());
+		}
+	} catch (...) {
+		return false;
+	}
+}
+
+bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
+{
+	try {
+		BHAddress addr;
+		if (QueryRPCTopic(topic, addr, timeout_ms)) {
+			const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size));
+			BHMsg reply;
+			if (SyncRequest(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
+				DataReply dr;
+				if (dr.ParseFromString(msg.body())) {
+					dr.mutable_data()->swap(out);
+					return true;
+				}
+			}
+		}
+	} catch (...) {
+		return false;
+	}
+}
diff --git a/src/socket.h b/src/socket.h
index c468dd3..eee5b5b 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -26,6 +26,7 @@
 #include <memory>
 #include <mutex>
 #include <thread>
+#include <unordered_map>
 #include <vector>
 
 class ShmSocket : private boost::noncopyable
@@ -42,12 +43,13 @@
 	};
 	typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
 	typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB;
+	typedef std::function<void(const void *data, const size_t size)> RequestResultCB;
 
 	ShmSocket(Type type, bhome_shm::SharedMemory &shm);
 	ShmSocket(Type type);
 	~ShmSocket();
-	// bool Request(const std::string &topic, const void *data, const size_t size, onReply);
-	bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv
+	bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
+	bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
 
 	// bool HandleRequest(onData);
 	bool ReadRequest(); // exclude with HandleRequest
@@ -60,10 +62,14 @@
 	// start recv.
 	bool Start(const RecvCB &onData, int nworker = 1);
 	bool StartRaw(const RecvRawCB &onData, int nworker = 1);
+	bool StartAsync(int nworker = 2);
 	bool Stop();
 	size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
 
 private:
+	bool AsyncRequest(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
+	bool SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms);
+	bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
 	bool StopNoLock();
 	bhome_shm::SharedMemory &shm_;
 	const Type type_;
@@ -72,6 +78,7 @@
 	std::atomic<bool> run_;
 
 	std::unique_ptr<Queue> mq_;
+	std::unordered_map<std::string, RecvCB> async_cbs_;
 };
 
 #endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/src/topic_rpc.cpp b/src/topic_rpc.cpp
new file mode 100644
index 0000000..065a861
--- /dev/null
+++ b/src/topic_rpc.cpp
@@ -0,0 +1,21 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  topic_rpc.cpp
+ *
+ *    Description:  topic request/reply manager
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�03鏈�31鏃� 16鏃�29鍒�31绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  YOUR NAME (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "topic_rpc.h"
+
+
+
diff --git a/src/topic_rpc.h b/src/topic_rpc.h
new file mode 100644
index 0000000..40ff985
--- /dev/null
+++ b/src/topic_rpc.h
@@ -0,0 +1,31 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  topic_rpc.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�03鏈�31鏃� 16鏃�30鍒�10绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  YOUR NAME (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef TOPIC_RPC_JU1AYN5L
+#define TOPIC_RPC_JU1AYN5L
+
+#include "socket.h"
+
+// request/reply topic manager
+class RPCManager
+{
+	ShmSocket socket_;
+
+public:
+};
+
+#endif // end of include guard: TOPIC_RPC_JU1AYN5L

--
Gitblit v1.8.0