From 628c1c21ffb19d8c96ed9ce89531595f9870ab1a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 23 四月 2021 18:41:02 +0800
Subject: [PATCH] add msg tag; recv all msgs before remove mq.

---
 src/topic_node.h |   28 +++++++++++++++++-----------
 1 files changed, 17 insertions(+), 11 deletions(-)

diff --git a/src/topic_node.h b/src/topic_node.h
index 8287b4a..3371c35 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -22,6 +22,7 @@
 #include "socket.h"
 #include <atomic>
 #include <memory>
+#include <vector>
 
 using namespace bhome_shm;
 using namespace bhome_msg;
@@ -55,8 +56,8 @@
 	// topic client
 	typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB;
 	bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
-	bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB());
-	bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
+	bool ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB());
+	bool ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
 
 	// publish
 	bool Publish(const MsgPublish &pub, const int timeout_ms);
@@ -107,17 +108,22 @@
 	};
 
 	// some sockets may be the same one, using functions make it easy to change.
+	enum { eSockStart,
+		   eSockNode = eSockStart,
+		   eSockPub = eSockNode,
+		   eSockServer,
+		   eSockClient,
+		   eSockSub,
+		   eSockEnd,
+	};
+	std::vector<std::unique_ptr<ShmSocket>> sockets_;
 
-	ShmSocket &SockNode() { return sock_node_; }
-	ShmSocket &SockPub() { return SockNode(); }
-	ShmSocket &SockSub() { return sock_sub_; }
-	ShmSocket &SockClient() { return sock_client_; }
-	ShmSocket &SockServer() { return sock_server_; }
+	ShmSocket &SockNode() { return *sockets_[eSockNode]; }
+	ShmSocket &SockPub() { return *sockets_[eSockPub]; }
+	ShmSocket &SockSub() { return *sockets_[eSockSub]; }
+	ShmSocket &SockClient() { return *sockets_[eSockClient]; }
+	ShmSocket &SockServer() { return *sockets_[eSockServer]; }
 
-	ShmSocket sock_node_;
-	ShmSocket sock_client_;
-	ShmSocket sock_server_;
-	ShmSocket sock_sub_;
 	enum State {
 		eStateUnregistered,
 		eStateOnline,

--
Gitblit v1.8.0