From 7ecd6323ffedbfef92c87c02b2a8680dd53b772c Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 06 五月 2021 19:37:50 +0800 Subject: [PATCH] rename atomic queue io function. --- src/topic_node.h | 10 ++++++++-- 1 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/topic_node.h b/src/topic_node.h index 20b27d2..afce4fc 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -22,6 +22,7 @@ #include "socket.h" #include <atomic> #include <memory> +#include <mutex> #include <vector> using namespace bhome_shm; @@ -44,7 +45,7 @@ bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Heartbeat(const int timeout_ms); - bool QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms); + bool QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms); // topic rpc server typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB; @@ -74,6 +75,7 @@ void Stop(); private: + MQId ssn() { return SockNode().id(); } bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms); typedef MsgQueryTopicReply::BHNodeAddress NodeAddress; int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms); @@ -136,7 +138,11 @@ void state(const State st) { state_.store(st); } void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); } State state() const { return state_.load(); } - bool IsOnline() const { return state() == eStateOnline; } + bool IsOnline() { return Init() && state() == eStateOnline; } + bool Init(); + bool Valid() const { return !sockets_.empty(); } + std::mutex mutex_; + MQId ssn_id_ = 0; std::atomic<State> state_; TopicQueryCache topic_query_cache_; -- Gitblit v1.8.0