From 2197cf91e7a3bd5941327ba630a42946b88f069e Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 09 四月 2021 14:15:41 +0800
Subject: [PATCH] join pub/sub to node; refactor.

---
 src/topic_node.h |   22 +++++++++++++++++-----
 1 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/src/topic_node.h b/src/topic_node.h
index 8852af1..34fe2ee 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -19,7 +19,6 @@
 #define TOPIC_NODE_YVKWA6TF
 
 #include "msg.h"
-#include "pubsub.h"
 #include "socket.h"
 #include <memory>
 
@@ -32,23 +31,26 @@
 	SharedMemory &shm_;
 	MsgRegister info_;
 
+	SharedMemory &shm() { return shm_; }
+
 public:
 	TopicNode(SharedMemory &shm);
 	~TopicNode();
+
+	void StopAll();
+	// topic node
 	bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms);
 	bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms);
 
 	// topic rpc server
 	typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
 	bool ServerStart(OnRequest const &cb, const int nworker = 2);
-	bool ServerStop();
 	bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
 	bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms);
 
 	// topic client
 	typedef std::function<void(const std::string &data)> RequestResultCB;
 	bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
-	bool ClientStopWorker();
 	bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB());
 	bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB())
 	{
@@ -60,7 +62,14 @@
 		return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms);
 	}
 
-	void StopAll();
+	// publish
+	bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
+
+	// subscribe
+	typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB;
+	bool SubscribeStartWorker(const TopicDataCB &tdcb, int nworker = 2);
+	bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
+	bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms);
 
 private:
 	bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
@@ -106,14 +115,17 @@
 	// some sockets may be the same one, using functions make it easy to change.
 
 	auto &SockNode() { return sock_node_; }
+	auto &SockPub() { return SockNode(); }
 	auto &SockSub() { return sock_sub_; }
 	auto &SockRequest() { return sock_request_; }
+	auto &SockClient() { return SockRequest(); }
 	auto &SockReply() { return sock_reply_; }
+	auto &SockServer() { return SockReply(); }
 
 	ShmSocket sock_node_;
 	ShmSocket sock_request_;
 	ShmSocket sock_reply_;
-	SocketSubscribe sock_sub_;
+	ShmSocket sock_sub_;
 
 	TopicQueryCache topic_query_cache_;
 };

--
Gitblit v1.8.0