From 7ecd6323ffedbfef92c87c02b2a8680dd53b772c Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 06 五月 2021 19:37:50 +0800
Subject: [PATCH] rename atomic queue io function.

---
 src/topic_node.h |   10 ++++++++--
 1 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/src/topic_node.h b/src/topic_node.h
index 20b27d2..afce4fc 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -22,6 +22,7 @@
 #include "socket.h"
 #include <atomic>
 #include <memory>
+#include <mutex>
 #include <vector>
 
 using namespace bhome_shm;
@@ -44,7 +45,7 @@
 	bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
 	bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
 	bool Heartbeat(const int timeout_ms);
-	bool QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms);
+	bool QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms);
 
 	// topic rpc server
 	typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB;
@@ -74,6 +75,7 @@
 	void Stop();
 
 private:
+	MQId ssn() { return SockNode().id(); }
 	bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms);
 	typedef MsgQueryTopicReply::BHNodeAddress NodeAddress;
 	int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms);
@@ -136,7 +138,11 @@
 	void state(const State st) { state_.store(st); }
 	void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); }
 	State state() const { return state_.load(); }
-	bool IsOnline() const { return state() == eStateOnline; }
+	bool IsOnline() { return Init() && state() == eStateOnline; }
+	bool Init();
+	bool Valid() const { return !sockets_.empty(); }
+	std::mutex mutex_;
+	MQId ssn_id_ = 0;
 	std::atomic<State> state_;
 
 	TopicQueryCache topic_query_cache_;

--
Gitblit v1.8.0