From d26327b3cde043a9470dcd7fea8e704ea517fdae Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 01 四月 2021 19:26:57 +0800
Subject: [PATCH] add req/rep center;

---
 src/reqrep.h |   35 ++++++++++++++++++++++++++++++++---
 1 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/src/reqrep.h b/src/reqrep.h
index 02cc86f..2971403 100644
--- a/src/reqrep.h
+++ b/src/reqrep.h
@@ -19,9 +19,12 @@
 #define REQREP_ACEH09NK
 
 #include "defs.h"
+#include "msg.h"
 #include "socket.h"
 #include <functional>
 #include <unordered_map>
+
+using bhome::msg::ProcInfo;
 
 class SocketRequest : private ShmSocket
 {
@@ -32,19 +35,21 @@
 	    Socket(shm, 64) { StartWorker(); }
 	SocketRequest() :
 	    SocketRequest(BHomeShm()) {}
+	~SocketRequest() { Stop(); }
 
 	typedef std::function<void(const std::string &data)> RequestResultCB;
 	bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
 	bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
+	bool Stop() { return Socket::Stop(); }
 	bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
 	bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
 	{
 		return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
 	}
-	bool SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
-	bool SyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, std::string &out)
+	bool SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
+	bool SyncRequest(const std::string &topic, const std::string &data, std::string &out, const int timeout_ms)
 	{
-		return SyncRequest(topic, data.data(), data.size(), timeout_ms, out);
+		return SyncRequest(topic, data.data(), data.size(), out, timeout_ms);
 	}
 
 private:
@@ -52,6 +57,30 @@
 	bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
 	bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
 	std::unordered_map<std::string, RecvCB> async_cbs_;
+
+	std::pair<std::string, bhome::msg::BHAddress> tmp_cache_;
+};
+
+class SocketReply : private ShmSocket
+{
+	typedef ShmSocket Socket;
+
+public:
+	SocketReply(Socket::Shm &shm) :
+	    Socket(shm, 64) {}
+	SocketReply() :
+	    SocketReply(BHomeShm()) {}
+	~SocketReply() { Stop(); }
+
+	typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
+	bool StartWorker(const OnRequest &rcb, int nworker = 2);
+	bool Stop() { return Socket::Stop(); }
+	bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
+	bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
+	bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
+	bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
+
+private:
 };
 
 #endif // end of include guard: REQREP_ACEH09NK

--
Gitblit v1.8.0