From 2197cf91e7a3bd5941327ba630a42946b88f069e Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 09 四月 2021 14:15:41 +0800 Subject: [PATCH] join pub/sub to node; refactor. --- src/topic_node.h | 22 +++++++++++++++++----- 1 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/topic_node.h b/src/topic_node.h index 8852af1..34fe2ee 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -19,7 +19,6 @@ #define TOPIC_NODE_YVKWA6TF #include "msg.h" -#include "pubsub.h" #include "socket.h" #include <memory> @@ -32,23 +31,26 @@ SharedMemory &shm_; MsgRegister info_; + SharedMemory &shm() { return shm_; } + public: TopicNode(SharedMemory &shm); ~TopicNode(); + + void StopAll(); + // topic node bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms); bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms); // topic rpc server typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; bool ServerStart(OnRequest const &cb, const int nworker = 2); - bool ServerStop(); bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms); // topic client typedef std::function<void(const std::string &data)> RequestResultCB; bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); - bool ClientStopWorker(); bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()); bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()) { @@ -60,7 +62,14 @@ return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms); } - void StopAll(); + // publish + bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms); + + // subscribe + typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB; + bool SubscribeStartWorker(const TopicDataCB &tdcb, int nworker = 2); + bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms); + bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms); private: bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); @@ -106,14 +115,17 @@ // some sockets may be the same one, using functions make it easy to change. auto &SockNode() { return sock_node_; } + auto &SockPub() { return SockNode(); } auto &SockSub() { return sock_sub_; } auto &SockRequest() { return sock_request_; } + auto &SockClient() { return SockRequest(); } auto &SockReply() { return sock_reply_; } + auto &SockServer() { return SockReply(); } ShmSocket sock_node_; ShmSocket sock_request_; ShmSocket sock_reply_; - SocketSubscribe sock_sub_; + ShmSocket sock_sub_; TopicQueryCache topic_query_cache_; }; -- Gitblit v1.8.0