From 628c1c21ffb19d8c96ed9ce89531595f9870ab1a Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 23 四月 2021 18:41:02 +0800 Subject: [PATCH] add msg tag; recv all msgs before remove mq. --- src/topic_node.h | 42 ++++++++++++++++++++++++++++-------------- 1 files changed, 28 insertions(+), 14 deletions(-) diff --git a/src/topic_node.h b/src/topic_node.h index 35cdde5..3371c35 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -20,7 +20,9 @@ #include "msg.h" #include "socket.h" +#include <atomic> #include <memory> +#include <vector> using namespace bhome_shm; using namespace bhome_msg; @@ -54,8 +56,8 @@ // topic client typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB; bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); - bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB()); - bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); + bool ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB()); + bool ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); // publish bool Publish(const MsgPublish &pub, const int timeout_ms); @@ -106,20 +108,32 @@ }; // some sockets may be the same one, using functions make it easy to change. + enum { eSockStart, + eSockNode = eSockStart, + eSockPub = eSockNode, + eSockServer, + eSockClient, + eSockSub, + eSockEnd, + }; + std::vector<std::unique_ptr<ShmSocket>> sockets_; - ShmSocket &SockNode() { return sock_node_; } - ShmSocket &SockPub() { return SockNode(); } - ShmSocket &SockSub() { return sock_sub_; } - ShmSocket &SockClient() { return sock_client_; } - ShmSocket &SockServer() { return sock_server_; } - bool IsRegistered() const { return registered_.load(); } + ShmSocket &SockNode() { return *sockets_[eSockNode]; } + ShmSocket &SockPub() { return *sockets_[eSockPub]; } + ShmSocket &SockSub() { return *sockets_[eSockSub]; } + ShmSocket &SockClient() { return *sockets_[eSockClient]; } + ShmSocket &SockServer() { return *sockets_[eSockServer]; } - ShmSocket sock_node_; - ShmSocket sock_client_; - ShmSocket sock_server_; - ShmSocket sock_sub_; - std::atomic<bool> registered_; - std::atomic<bool> registered_ever_; + enum State { + eStateUnregistered, + eStateOnline, + eStateOffline // heartbeat fail. + }; + void state(const State st) { state_.store(st); } + void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); } + State state() const { return state_.load(); } + bool IsOnline() const { return state() == eStateOnline; } + std::atomic<State> state_; TopicQueryCache topic_query_cache_; }; -- Gitblit v1.8.0