lichao
2021-04-21 faa795164368ec444410374699548bb27b0d95ce
add node state, test change node mq length.
2个文件已修改
47 ■■■■■ 已修改文件
src/topic_node.cpp 33 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp
@@ -32,10 +32,12 @@
    std::string msg_id;
};
const int kMqLen = 700;
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), sock_node_(shm), sock_client_(shm), sock_server_(shm), sock_sub_(shm), registered_(false), registered_ever_(false)
    shm_(shm), sock_node_(shm), sock_client_(shm, kMqLen), sock_server_(shm, kMqLen), sock_sub_(shm, kMqLen), state_(eStateUnregistered)
{
    // recv msgs to avoid memory leak.
    auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
@@ -49,7 +51,7 @@
{
    Stop();
    SockNode().Stop();
    if (!registered_ever_) {
    if (state() == eStateUnregistered) {
        SockNode().Remove();
        SockClient().Remove();
        SockServer().Remove();
@@ -98,9 +100,10 @@
                  msg.ParseBody(rbody) &&
                  IsSuccess(rbody.errmsg().errcode());
        if (ok) {
            registered_ever_.store(true);
            state(eStateOnline);
        } else {
            state_cas(eStateOnline, eStateOffline);
        }
        registered_.store(ok);
    };
    if (timeout_ms == 0) {
@@ -117,13 +120,13 @@
        if (r) {
            CheckResult(reply, reply_head, reply_body);
        }
        return IsRegistered();
        return IsOnline();
    }
}
bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!IsRegistered()) {
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        return false;
    }
@@ -156,7 +159,7 @@
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!IsRegistered()) {
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        return false;
    }
@@ -223,7 +226,7 @@
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
{
    if (!IsRegistered()) {
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        return false;
    }
@@ -247,7 +250,7 @@
bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body)
{
    if (!IsRegistered()) {
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        return false;
    }
@@ -285,7 +288,7 @@
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
{
    if (!IsRegistered()) {
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        return false;
    }
@@ -351,7 +354,7 @@
bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
{
    if (!IsRegistered()) {
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        return false;
    }
@@ -386,7 +389,7 @@
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms)
{
    if (!IsRegistered()) {
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        return false;
    }
@@ -428,7 +431,7 @@
bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms)
{
    if (!IsRegistered()) {
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        return false;
    }
@@ -459,7 +462,7 @@
bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!IsRegistered()) {
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        return false;
    }
@@ -508,7 +511,7 @@
bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
{
    if (!IsRegistered()) {
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        return false;
    }
src/topic_node.h
@@ -20,6 +20,7 @@
#include "msg.h"
#include "socket.h"
#include <atomic>
#include <memory>
using namespace bhome_shm;
@@ -112,14 +113,21 @@
    ShmSocket &SockSub() { return sock_sub_; }
    ShmSocket &SockClient() { return sock_client_; }
    ShmSocket &SockServer() { return sock_server_; }
    bool IsRegistered() const { return registered_.load(); }
    ShmSocket sock_node_;
    ShmSocket sock_client_;
    ShmSocket sock_server_;
    ShmSocket sock_sub_;
    std::atomic<bool> registered_;
    std::atomic<bool> registered_ever_;
    enum State {
        eStateUnregistered,
        eStateOnline,
        eStateOffline // heartbeat fail.
    };
    void state(const State st) { state_.store(st); }
    void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); }
    State state() const { return state_.load(); }
    bool IsOnline() const { return state() == eStateOnline; }
    std::atomic<State> state_;
    TopicQueryCache topic_query_cache_;
};