From bb9a7e348892eb5c4fccb063380aa6fcd9612b71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 06 四月 2021 17:32:35 +0800
Subject: [PATCH] server resend failed; rename msgs; refactor.

---
 src/topic_request.cpp |  106 +++++++++--------------------------------------------
 1 files changed, 18 insertions(+), 88 deletions(-)

diff --git a/src/reqrep.cpp b/src/topic_request.cpp
similarity index 65%
rename from src/reqrep.cpp
rename to src/topic_request.cpp
index 25c0826..ce7c1a8 100644
--- a/src/reqrep.cpp
+++ b/src/topic_request.cpp
@@ -1,9 +1,9 @@
 /*
  * =====================================================================================
  *
- *       Filename:  reqrep.cpp
+ *       Filename:  topic_request.cpp
  *
- *    Description:  topic request/reply sockets
+ *    Description:  topic request sockets
  *
  *        Version:  1.0
  *        Created:  2021骞�04鏈�01鏃� 09鏃�35鍒�35绉�
@@ -15,7 +15,7 @@
  *
  * =====================================================================================
  */
-#include "reqrep.h"
+#include "topic_request.h"
 #include "bh_util.h"
 #include "msg.h"
 #include <chrono>
@@ -40,10 +40,10 @@
 		};
 
 		RecvBHMsgCB cb;
-		if (Find(cb)) {
+		if (Find(cb) && cb) {
 			cb(msg);
-		} else if (msg.type() == kMsgTypeReply) {
-			DataReply reply;
+		} else if (msg.type() == kMsgTypeRequestTopicReply && rrcb) {
+			MsgRequestTopicReply reply;
 			if (reply.ParseFromString(msg.body())) {
 				rrcb(reply.data());
 			}
@@ -74,8 +74,8 @@
 	auto Call = [&](const void *remote) {
 		const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
 		auto onRecv = [cb](BHMsg &msg) {
-			if (msg.type() == kMsgTypeReply) {
-				DataReply reply;
+			if (msg.type() == kMsgTypeRequestTopicReply) {
+				MsgRequestTopicReply reply;
 				if (reply.ParseFromString(msg.body())) {
 					cb(reply.data());
 				}
@@ -103,16 +103,22 @@
 		if (QueryRPCTopic(topic, addr, timeout_ms)) {
 			const BHMsg &req(MakeRequest(mq().Id(), topic, data, size));
 			BHMsg reply;
-			if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
-				DataReply dr;
+			if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeRequestTopicReply) {
+				MsgRequestTopicReply dr;
 				if (dr.ParseFromString(reply.body())) {
 					dr.mutable_data()->swap(out);
 					return true;
+				} else {
+					printf("error parse reply.\n");
 				}
+			} else {
+				printf("error recv data. line: %d\n", __LINE__);
 			}
 		} else {
+			printf("error recv data. line: %d\n", __LINE__);
 		}
 	} catch (...) {
+		printf("error recv data. line: %d\n", __LINE__);
 	}
 	return false;
 }
@@ -186,8 +192,8 @@
 	BHMsg result;
 	const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
 	if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) {
-		if (result.type() == kMsgTypeProcQueryTopicReply) {
-			DataProcQueryTopicReply reply;
+		if (result.type() == kMsgTypeQueryTopicReply) {
+			MsgQueryTopicReply reply;
 			if (reply.ParseFromString(result.body())) {
 				addr = reply.address();
 				if (addr.mq_id().empty()) {
@@ -202,79 +208,3 @@
 	}
 	return false;
 }
-
-// reply socket
-namespace
-{
-struct SrcInfo {
-	std::vector<BHAddress> route;
-	std::string msg_id;
-};
-
-} // namespace
-
-bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
-{
-	//TODO check reply?
-	return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
-}
-bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
-{
-	return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
-}
-bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
-{
-	auto onRecv = [this, rcb](BHMsg &msg) {
-		if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) {
-			DataRequest req;
-			if (req.ParseFromString(msg.body())) {
-				std::string out;
-				if (rcb(req.topic(), req.data(), out)) {
-					BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
-					for (int i = 0; i < msg.route_size() - 1; ++i) {
-						msg.add_route()->Swap(msg.mutable_route(i));
-					}
-					SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100);
-				}
-			}
-		} else {
-			// ignored, or dropped
-		}
-	};
-
-	return rcb && Start(onRecv, nworker);
-}
-
-bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
-{
-	BHMsg msg;
-	if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) {
-		DataRequest request;
-		if (request.ParseFromString(msg.body())) {
-			request.mutable_topic()->swap(topic);
-			request.mutable_data()->swap(data);
-			SrcInfo *p = new SrcInfo;
-			p->route.assign(msg.route().begin(), msg.route().end());
-			p->msg_id = msg.msg_id();
-			src_info = p;
-			return true;
-		}
-	}
-	return false;
-}
-
-bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
-{
-	SrcInfo *p = static_cast<SrcInfo *>(src_info);
-	DEFER1(delete p);
-	if (!p || p->route.empty()) {
-		return false;
-	}
-
-	BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
-	for (unsigned i = 0; i < p->route.size() - 1; ++i) {
-		msg.add_route()->Swap(&p->route[i]);
-	}
-
-	return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
-}
\ No newline at end of file

--
Gitblit v1.8.0