From faa795164368ec444410374699548bb27b0d95ce Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 21 四月 2021 13:26:58 +0800
Subject: [PATCH] add node state, test change node mq length.
---
src/topic_node.h | 14 +++++++++++---
src/topic_node.cpp | 33 ++++++++++++++++++---------------
2 files changed, 29 insertions(+), 18 deletions(-)
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index bbf88eb..70247dd 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -32,10 +32,12 @@
std::string msg_id;
};
+const int kMqLen = 700;
+
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
- shm_(shm), sock_node_(shm), sock_client_(shm), sock_server_(shm), sock_sub_(shm), registered_(false), registered_ever_(false)
+ shm_(shm), sock_node_(shm), sock_client_(shm, kMqLen), sock_server_(shm, kMqLen), sock_sub_(shm, kMqLen), state_(eStateUnregistered)
{
// recv msgs to avoid memory leak.
auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
@@ -49,7 +51,7 @@
{
Stop();
SockNode().Stop();
- if (!registered_ever_) {
+ if (state() == eStateUnregistered) {
SockNode().Remove();
SockClient().Remove();
SockServer().Remove();
@@ -98,9 +100,10 @@
msg.ParseBody(rbody) &&
IsSuccess(rbody.errmsg().errcode());
if (ok) {
- registered_ever_.store(true);
+ state(eStateOnline);
+ } else {
+ state_cas(eStateOnline, eStateOffline);
}
- registered_.store(ok);
};
if (timeout_ms == 0) {
@@ -117,13 +120,13 @@
if (r) {
CheckResult(reply, reply_head, reply_body);
}
- return IsRegistered();
+ return IsOnline();
}
}
bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
- if (!IsRegistered()) {
+ if (!IsOnline()) {
SetLastError(eNotRegistered, "Not Registered.");
return false;
}
@@ -156,7 +159,7 @@
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
- if (!IsRegistered()) {
+ if (!IsOnline()) {
SetLastError(eNotRegistered, "Not Registered.");
return false;
}
@@ -223,7 +226,7 @@
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
{
- if (!IsRegistered()) {
+ if (!IsOnline()) {
SetLastError(eNotRegistered, "Not Registered.");
return false;
}
@@ -247,7 +250,7 @@
bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body)
{
- if (!IsRegistered()) {
+ if (!IsOnline()) {
SetLastError(eNotRegistered, "Not Registered.");
return false;
}
@@ -285,7 +288,7 @@
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
{
- if (!IsRegistered()) {
+ if (!IsOnline()) {
SetLastError(eNotRegistered, "Not Registered.");
return false;
}
@@ -351,7 +354,7 @@
bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
{
- if (!IsRegistered()) {
+ if (!IsOnline()) {
SetLastError(eNotRegistered, "Not Registered.");
return false;
}
@@ -386,7 +389,7 @@
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms)
{
- if (!IsRegistered()) {
+ if (!IsOnline()) {
SetLastError(eNotRegistered, "Not Registered.");
return false;
}
@@ -428,7 +431,7 @@
bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms)
{
- if (!IsRegistered()) {
+ if (!IsOnline()) {
SetLastError(eNotRegistered, "Not Registered.");
return false;
}
@@ -459,7 +462,7 @@
bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
- if (!IsRegistered()) {
+ if (!IsOnline()) {
SetLastError(eNotRegistered, "Not Registered.");
return false;
}
@@ -508,7 +511,7 @@
bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
{
- if (!IsRegistered()) {
+ if (!IsOnline()) {
SetLastError(eNotRegistered, "Not Registered.");
return false;
}
diff --git a/src/topic_node.h b/src/topic_node.h
index 35cdde5..8287b4a 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -20,6 +20,7 @@
#include "msg.h"
#include "socket.h"
+#include <atomic>
#include <memory>
using namespace bhome_shm;
@@ -112,14 +113,21 @@
ShmSocket &SockSub() { return sock_sub_; }
ShmSocket &SockClient() { return sock_client_; }
ShmSocket &SockServer() { return sock_server_; }
- bool IsRegistered() const { return registered_.load(); }
ShmSocket sock_node_;
ShmSocket sock_client_;
ShmSocket sock_server_;
ShmSocket sock_sub_;
- std::atomic<bool> registered_;
- std::atomic<bool> registered_ever_;
+ enum State {
+ eStateUnregistered,
+ eStateOnline,
+ eStateOffline // heartbeat fail.
+ };
+ void state(const State st) { state_.store(st); }
+ void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); }
+ State state() const { return state_.load(); }
+ bool IsOnline() const { return state() == eStateOnline; }
+ std::atomic<State> state_;
TopicQueryCache topic_query_cache_;
};
--
Gitblit v1.8.0