From bcd780993c176b93f7393607f8003adf66e6676a Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 26 四月 2021 10:27:14 +0800 Subject: [PATCH] fix node default ignore msg. --- src/topic_node.h | 33 ++++++++++++++++++++++----------- 1 files changed, 22 insertions(+), 11 deletions(-) diff --git a/src/topic_node.h b/src/topic_node.h index 8287b4a..3c90e5b 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -22,6 +22,7 @@ #include "socket.h" #include <atomic> #include <memory> +#include <vector> using namespace bhome_shm; using namespace bhome_msg; @@ -40,8 +41,10 @@ // topic node bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); + bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Heartbeat(const int timeout_ms); + bool QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms); // topic rpc server typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB; @@ -55,8 +58,8 @@ // topic client typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB; bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); - bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB()); - bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); + bool ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB()); + bool ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); // publish bool Publish(const MsgPublish &pub, const int timeout_ms); @@ -71,7 +74,10 @@ void Stop(); private: + MQId ssn() { return SockNode().id(); } bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms); + typedef MsgQueryTopicReply::BHNodeAddress NodeAddress; + int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms); const std::string &proc_id() { return info_.proc_id(); } typedef BHAddress Address; @@ -107,17 +113,22 @@ }; // some sockets may be the same one, using functions make it easy to change. + enum { eSockStart, + eSockNode = eSockStart, + eSockPub = eSockNode, + eSockServer, + eSockClient, + eSockSub, + eSockEnd, + }; + std::vector<std::unique_ptr<ShmSocket>> sockets_; - ShmSocket &SockNode() { return sock_node_; } - ShmSocket &SockPub() { return SockNode(); } - ShmSocket &SockSub() { return sock_sub_; } - ShmSocket &SockClient() { return sock_client_; } - ShmSocket &SockServer() { return sock_server_; } + ShmSocket &SockNode() { return *sockets_[eSockNode]; } + ShmSocket &SockPub() { return *sockets_[eSockPub]; } + ShmSocket &SockSub() { return *sockets_[eSockSub]; } + ShmSocket &SockClient() { return *sockets_[eSockClient]; } + ShmSocket &SockServer() { return *sockets_[eSockServer]; } - ShmSocket sock_node_; - ShmSocket sock_client_; - ShmSocket sock_server_; - ShmSocket sock_sub_; enum State { eStateUnregistered, eStateOnline, -- Gitblit v1.8.0