| | |
| | | const std::string kTopicNode = Join(kTopicCenterRoot, "node"); |
| | | const std::string kTopicNodeOnline = Join(kTopicNode, "online"); |
| | | const std::string kTopicNodeOffline = Join(kTopicNode, "offline"); |
| | | const std::string kTopicNodeService = Join(kTopicNode, "service"); |
| | | const std::string kTopicNodeSub = Join(kTopicNode, "subscribe"); |
| | | const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe"); |
| | | } // namespace |
| | | |
| | | ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn) |
| | |
| | | { |
| | | state_.timestamp_ = NowSec() - offline_time; |
| | | state_.flag_ = kStateOffline; |
| | | |
| | | Json json; |
| | | json.put("proc_id", proc_.proc_id()); |
| | | center_.Publish(shm_, kTopicNodeOffline, json.dump()); |
| | | center_.Notify(kTopicNodeOffline, *this); |
| | | } |
| | | |
| | | void NodeCenter::Notify(const Topic &topic, NodeInfo &node) |
| | | { |
| | | if (node.proc_.proc_id().empty()) { return; } // node init, ignore. |
| | | Json json; |
| | | json.put("proc_id", node.proc_.proc_id()); |
| | | Publish(node.shm_, topic, json.dump()); |
| | | } |
| | | void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) |
| | | { |
| | | auto old = state_.flag_; |
| | | auto diff = now - state_.timestamp_; |
| | | auto publish = [this](const std::string &topic) { |
| | | if (proc_.proc_id().empty()) { return; } // node init, ignore. |
| | | Json json; |
| | | json.put("proc_id", proc_.proc_id()); |
| | | center_.Publish(shm_, topic, json.dump()); |
| | | }; |
| | | |
| | | LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff; |
| | | if (diff < offline_time) { |
| | | state_.flag_ = kStateNormal; |
| | | if (old != state_.flag_) { |
| | | publish(kTopicNodeOnline); |
| | | center_.Notify(kTopicNodeOnline, *this); |
| | | } |
| | | } else if (diff < kill_time) { |
| | | state_.flag_ = kStateOffline; |
| | | if (old != state_.flag_) { |
| | | publish(kTopicNodeOffline); |
| | | center_.Notify(kTopicNodeOffline, *this); |
| | | } |
| | | } else { |
| | | state_.flag_ = kStateKillme; |
| | |
| | | for (auto &topic : topics) { |
| | | LOG_DEBUG() << "\t" << topic; |
| | | } |
| | | Notify(kTopicNodeService, *node); |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | |
| | | LOG_DEBUG() << "subscribe net : " << msg.network(); |
| | | if (msg.network()) { |
| | | Sub(net_sub_, center_.net_sub_map_); |
| | | center_.Notify(kTopicNodeSub, *this); |
| | | } else { |
| | | Sub(local_sub_, center_.local_sub_map_); |
| | | } |
| | |
| | | }; |
| | | if (msg.network()) { |
| | | Unsub(net_sub_, center_.net_sub_map_); |
| | | center_.Notify(kTopicNodeUnsub, *this); |
| | | } else { |
| | | Unsub(local_sub_, center_.local_sub_map_); |
| | | } |