From 83085f2ce99cca05d40a19482151873a55e6393a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 02 四月 2021 19:32:21 +0800
Subject: [PATCH] refactor center; add async request no cb.

---
 src/reqrep.cpp |   34 +++++++++++++++++++++++++++++-----
 1 files changed, 29 insertions(+), 5 deletions(-)

diff --git a/src/reqrep.cpp b/src/reqrep.cpp
index b8e423b..25c0826 100644
--- a/src/reqrep.cpp
+++ b/src/reqrep.cpp
@@ -26,7 +26,7 @@
 bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker)
 {
 	auto AsyncRecvProc = [this, rrcb](BHMsg &msg) {
-		auto Find = [&](RecvCB &cb) {
+		auto Find = [&](RecvBHMsgCB &cb) {
 			std::lock_guard<std::mutex> lock(mutex());
 			const std::string &msgid = msg.msg_id();
 			auto pos = async_cbs_.find(msgid);
@@ -39,10 +39,10 @@
 			}
 		};
 
-		RecvCB cb;
-		if (Find(cb) && cb) {
+		RecvBHMsgCB cb;
+		if (Find(cb)) {
 			cb(msg);
-		} else if (rrcb && msg.type() == kMsgTypeReply) {
+		} else if (msg.type() == kMsgTypeReply) {
 			DataReply reply;
 			if (reply.ParseFromString(msg.body())) {
 				rrcb(reply.data());
@@ -55,6 +55,20 @@
 	return Start(AsyncRecvProc, nworker);
 }
 
+bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
+{
+	try {
+		BHAddress addr;
+		if (QueryRPCTopic(topic, addr, timeout_ms)) {
+			const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
+			return AsyncSend(addr.mq_id().data(), &msg, timeout_ms);
+		} else {
+			return false;
+		}
+	} catch (...) {
+		return false;
+	}
+}
 bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
 {
 	auto Call = [&](const void *remote) {
@@ -103,7 +117,17 @@
 	return false;
 }
 
-bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
+bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms)
+{
+	assert(remote && pmsg);
+	try {
+		const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
+		return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms);
+	} catch (...) {
+		return false;
+	}
+}
+bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb)
 {
 	assert(remote && pmsg);
 	try {

--
Gitblit v1.8.0