From 83085f2ce99cca05d40a19482151873a55e6393a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 02 四月 2021 19:32:21 +0800
Subject: [PATCH] refactor center; add async request no cb.

---
 src/reqrep.cpp |   52 +++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 39 insertions(+), 13 deletions(-)

diff --git a/src/reqrep.cpp b/src/reqrep.cpp
index bed6496..25c0826 100644
--- a/src/reqrep.cpp
+++ b/src/reqrep.cpp
@@ -26,7 +26,7 @@
 bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker)
 {
 	auto AsyncRecvProc = [this, rrcb](BHMsg &msg) {
-		auto Find = [&](RecvCB &cb) {
+		auto Find = [&](RecvBHMsgCB &cb) {
 			std::lock_guard<std::mutex> lock(mutex());
 			const std::string &msgid = msg.msg_id();
 			auto pos = async_cbs_.find(msgid);
@@ -39,10 +39,10 @@
 			}
 		};
 
-		RecvCB cb;
-		if (Find(cb) && cb) {
+		RecvBHMsgCB cb;
+		if (Find(cb)) {
 			cb(msg);
-		} else if (rrcb && msg.type() == kMsgTypeReply) {
+		} else if (msg.type() == kMsgTypeReply) {
 			DataReply reply;
 			if (reply.ParseFromString(msg.body())) {
 				rrcb(reply.data());
@@ -55,7 +55,21 @@
 	return Start(AsyncRecvProc, nworker);
 }
 
-bool SocketRequest::AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
+bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
+{
+	try {
+		BHAddress addr;
+		if (QueryRPCTopic(topic, addr, timeout_ms)) {
+			const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
+			return AsyncSend(addr.mq_id().data(), &msg, timeout_ms);
+		} else {
+			return false;
+		}
+	} catch (...) {
+		return false;
+	}
+}
+bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
 {
 	auto Call = [&](const void *remote) {
 		const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
@@ -82,7 +96,7 @@
 	}
 }
 
-bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
+bool SocketRequest::SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
 {
 	try {
 		BHAddress addr;
@@ -103,7 +117,17 @@
 	return false;
 }
 
-bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
+bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms)
+{
+	assert(remote && pmsg);
+	try {
+		const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
+		return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms);
+	} catch (...) {
+		return false;
+	}
+}
+bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb)
 {
 	assert(remote && pmsg);
 	try {
@@ -153,10 +177,9 @@
 	}
 }
 
-bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
+bool SocketRequest::QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
 {
-	if (tmp_cache_.first == topic) {
-		addr = tmp_cache_.second;
+	if (topic_cache_.Find(topic, addr)) {
 		return true;
 	}
 
@@ -167,9 +190,12 @@
 			DataProcQueryTopicReply reply;
 			if (reply.ParseFromString(result.body())) {
 				addr = reply.address();
-				tmp_cache_.first = topic;
-				tmp_cache_.second = addr;
-				return !addr.mq_id().empty();
+				if (addr.mq_id().empty()) {
+					return false;
+				} else {
+					topic_cache_.Update(topic, addr);
+					return true;
+				}
 			}
 		}
 	} else {

--
Gitblit v1.8.0