From d26327b3cde043a9470dcd7fea8e704ea517fdae Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 01 四月 2021 19:26:57 +0800
Subject: [PATCH] add req/rep center;

---
 src/reqrep.cpp |  112 ++++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 101 insertions(+), 11 deletions(-)

diff --git a/src/reqrep.cpp b/src/reqrep.cpp
index e1636fd..bed6496 100644
--- a/src/reqrep.cpp
+++ b/src/reqrep.cpp
@@ -16,6 +16,7 @@
  * =====================================================================================
  */
 #include "reqrep.h"
+#include "bh_util.h"
 #include "msg.h"
 #include <chrono>
 #include <condition_variable>
@@ -73,30 +74,33 @@
 		BHAddress addr;
 		if (QueryRPCTopic(topic, addr, timeout_ms)) {
 			return Call(addr.mq_id().data());
+		} else {
+			return false;
 		}
 	} catch (...) {
 		return false;
 	}
 }
 
-bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
+bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
 {
 	try {
 		BHAddress addr;
 		if (QueryRPCTopic(topic, addr, timeout_ms)) {
-			const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
+			const BHMsg &req(MakeRequest(mq().Id(), topic, data, size));
 			BHMsg reply;
-			if (SyncSendAndRecv(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
+			if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
 				DataReply dr;
-				if (dr.ParseFromString(msg.body())) {
+				if (dr.ParseFromString(reply.body())) {
 					dr.mutable_data()->swap(out);
 					return true;
 				}
 			}
+		} else {
 		}
 	} catch (...) {
-		return false;
 	}
+	return false;
 }
 
 bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
@@ -132,11 +136,13 @@
 			if (!st->canceled) {
 				static_cast<BHMsg *>(result)->Swap(&msg);
 				st->cv.notify_one();
-			} // else result is no longer valid.
+			} else {
+			}
 		};
 
 		std::unique_lock<std::mutex> lk(st->mutex);
-		if (AsyncSend(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
+		bool sendok = AsyncSend(remote, msg, timeout_ms, OnRecv);
+		if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
 			return true;
 		} else {
 			st->canceled = true;
@@ -149,16 +155,100 @@
 
 bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
 {
+	if (tmp_cache_.first == topic) {
+		addr = tmp_cache_.second;
+		return true;
+	}
+
 	BHMsg result;
-	const BHMsg &msg = MakeQueryTopic(topic);
-	if (SyncSendAndRecv(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
-		if (result.type() == kMsgTypeQueryTopicReply) {
-			DataQueryTopicReply reply;
+	const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
+	if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) {
+		if (result.type() == kMsgTypeProcQueryTopicReply) {
+			DataProcQueryTopicReply reply;
 			if (reply.ParseFromString(result.body())) {
 				addr = reply.address();
+				tmp_cache_.first = topic;
+				tmp_cache_.second = addr;
 				return !addr.mq_id().empty();
 			}
+		}
+	} else {
+	}
+	return false;
+}
+
+// reply socket
+namespace
+{
+struct SrcInfo {
+	std::vector<BHAddress> route;
+	std::string msg_id;
+};
+
+} // namespace
+
+bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
+{
+	//TODO check reply?
+	return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
+}
+bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
+{
+	return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
+}
+bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
+{
+	auto onRecv = [this, rcb](BHMsg &msg) {
+		if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) {
+			DataRequest req;
+			if (req.ParseFromString(msg.body())) {
+				std::string out;
+				if (rcb(req.topic(), req.data(), out)) {
+					BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
+					for (int i = 0; i < msg.route_size() - 1; ++i) {
+						msg.add_route()->Swap(msg.mutable_route(i));
+					}
+					SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100);
+				}
+			}
+		} else {
+			// ignored, or dropped
+		}
+	};
+
+	return rcb && Start(onRecv, nworker);
+}
+
+bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
+{
+	BHMsg msg;
+	if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) {
+		DataRequest request;
+		if (request.ParseFromString(msg.body())) {
+			request.mutable_topic()->swap(topic);
+			request.mutable_data()->swap(data);
+			SrcInfo *p = new SrcInfo;
+			p->route.assign(msg.route().begin(), msg.route().end());
+			p->msg_id = msg.msg_id();
+			src_info = p;
+			return true;
 		}
 	}
 	return false;
 }
+
+bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
+{
+	SrcInfo *p = static_cast<SrcInfo *>(src_info);
+	DEFER1(delete p);
+	if (!p || p->route.empty()) {
+		return false;
+	}
+
+	BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
+	for (unsigned i = 0; i < p->route.size() - 1; ++i) {
+		msg.add_route()->Swap(&p->route[i]);
+	}
+
+	return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
+}
\ No newline at end of file

--
Gitblit v1.8.0