lichao
2021-06-29 6c07fe29a5185835f28059f627a1d30e462da28b
add notify node change.
2个文件已修改
29 ■■■■■ 已修改文件
box/node_center.cpp 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.cpp
@@ -31,6 +31,9 @@
const std::string kTopicNode = Join(kTopicCenterRoot, "node");
const std::string kTopicNodeOnline = Join(kTopicNode, "online");
const std::string kTopicNodeOffline = Join(kTopicNode, "offline");
const std::string kTopicNodeService = Join(kTopicNode, "service");
const std::string kTopicNodeSub = Join(kTopicNode, "subscribe");
const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe");
} // namespace
ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn)
@@ -114,33 +117,31 @@
{
    state_.timestamp_ = NowSec() - offline_time;
    state_.flag_ = kStateOffline;
    Json json;
    json.put("proc_id", proc_.proc_id());
    center_.Publish(shm_, kTopicNodeOffline, json.dump());
    center_.Notify(kTopicNodeOffline, *this);
}
void NodeCenter::Notify(const Topic &topic, NodeInfo &node)
{
    if (node.proc_.proc_id().empty()) { return; } // node init, ignore.
    Json json;
    json.put("proc_id", node.proc_.proc_id());
    Publish(node.shm_, topic, json.dump());
}
void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
{
    auto old = state_.flag_;
    auto diff = now - state_.timestamp_;
    auto publish = [this](const std::string &topic) {
        if (proc_.proc_id().empty()) { return; } // node init, ignore.
        Json json;
        json.put("proc_id", proc_.proc_id());
        center_.Publish(shm_, topic, json.dump());
    };
    LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
    if (diff < offline_time) {
        state_.flag_ = kStateNormal;
        if (old != state_.flag_) {
            publish(kTopicNodeOnline);
            center_.Notify(kTopicNodeOnline, *this);
        }
    } else if (diff < kill_time) {
        state_.flag_ = kStateOffline;
        if (old != state_.flag_) {
            publish(kTopicNodeOffline);
            center_.Notify(kTopicNodeOffline, *this);
        }
    } else {
        state_.flag_ = kStateKillme;
@@ -476,6 +477,7 @@
            for (auto &topic : topics) {
                LOG_DEBUG() << "\t" << topic;
            }
            Notify(kTopicNodeService, *node);
            return MakeReply(eSuccess);
        });
}
@@ -588,6 +590,7 @@
    LOG_DEBUG() << "subscribe net : " << msg.network();
    if (msg.network()) {
        Sub(net_sub_, center_.net_sub_map_);
        center_.Notify(kTopicNodeSub, *this);
    } else {
        Sub(local_sub_, center_.local_sub_map_);
    }
@@ -632,6 +635,7 @@
    };
    if (msg.network()) {
        Unsub(net_sub_, center_.net_sub_map_);
        center_.Notify(kTopicNodeUnsub, *this);
    } else {
        Unsub(local_sub_, center_.local_sub_map_);
    }
box/node_center.h
@@ -194,6 +194,7 @@
    void CheckNodes();
    bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
    void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
    void Notify(const Topic &topic, NodeInfo &node);
    void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg);
    Clients DoFindClients(const std::string &topic, bool from_remote);
    bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }