lichao
2021-06-29 6c07fe29a5185835f28059f627a1d30e462da28b
box/node_center.cpp
@@ -31,6 +31,9 @@
const std::string kTopicNode = Join(kTopicCenterRoot, "node");
const std::string kTopicNodeOnline = Join(kTopicNode, "online");
const std::string kTopicNodeOffline = Join(kTopicNode, "offline");
const std::string kTopicNodeService = Join(kTopicNode, "service");
const std::string kTopicNodeSub = Join(kTopicNode, "subscribe");
const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe");
} // namespace
ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn)
@@ -114,33 +117,31 @@
{
   state_.timestamp_ = NowSec() - offline_time;
   state_.flag_ = kStateOffline;
   Json json;
   json.put("proc_id", proc_.proc_id());
   center_.Publish(shm_, kTopicNodeOffline, json.dump());
   center_.Notify(kTopicNodeOffline, *this);
}
void NodeCenter::Notify(const Topic &topic, NodeInfo &node)
{
   if (node.proc_.proc_id().empty()) { return; } // node init, ignore.
   Json json;
   json.put("proc_id", node.proc_.proc_id());
   Publish(node.shm_, topic, json.dump());
}
void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
{
   auto old = state_.flag_;
   auto diff = now - state_.timestamp_;
   auto publish = [this](const std::string &topic) {
      if (proc_.proc_id().empty()) { return; } // node init, ignore.
      Json json;
      json.put("proc_id", proc_.proc_id());
      center_.Publish(shm_, topic, json.dump());
   };
   LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
   if (diff < offline_time) {
      state_.flag_ = kStateNormal;
      if (old != state_.flag_) {
         publish(kTopicNodeOnline);
         center_.Notify(kTopicNodeOnline, *this);
      }
   } else if (diff < kill_time) {
      state_.flag_ = kStateOffline;
      if (old != state_.flag_) {
         publish(kTopicNodeOffline);
         center_.Notify(kTopicNodeOffline, *this);
      }
   } else {
      state_.flag_ = kStateKillme;
@@ -476,6 +477,7 @@
          for (auto &topic : topics) {
             LOG_DEBUG() << "\t" << topic;
          }
          Notify(kTopicNodeService, *node);
          return MakeReply(eSuccess);
       });
}
@@ -588,6 +590,7 @@
   LOG_DEBUG() << "subscribe net : " << msg.network();
   if (msg.network()) {
      Sub(net_sub_, center_.net_sub_map_);
      center_.Notify(kTopicNodeSub, *this);
   } else {
      Sub(local_sub_, center_.local_sub_map_);
   }
@@ -632,6 +635,7 @@
   };
   if (msg.network()) {
      Unsub(net_sub_, center_.net_sub_map_);
      center_.Notify(kTopicNodeUnsub, *this);
   } else {
      Unsub(local_sub_, center_.local_sub_map_);
   }