lichao
2021-05-20 68c7bef33e74f23aa0136ccd6f7faa654d671ebc
box/node_center.cpp
@@ -16,7 +16,22 @@
 * =====================================================================================
 */
#include "node_center.h"
#include "json.h"
#include "log.h"
using ssjson::Json;
namespace
{
std::string Join(const std::string &parent, const std::string &child)
{
   return parent + kTopicSep + child;
}
const std::string kTopicCenterRoot = "#center";
const std::string kTopicNode = Join(kTopicCenterRoot, "node");
const std::string kTopicNodeOnline = Join(kTopicNode, "online");
const std::string kTopicNodeOffline = Join(kTopicNode, "offline");
} // namespace
ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn)
{
@@ -82,7 +97,7 @@
void MsgRecords::DebugPrint() const
{
   LOG_DEBUG() << "msgs : " << size();
   LOG_TRACE() << "msgs : " << size();
   int i = 0;
   int total_count = 0;
   for (auto &kv : msgs_) {
@@ -90,26 +105,44 @@
      total_count += msg.Count();
      LOG_TRACE() << "  " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size();
   }
   LOG_DEBUG() << "total count: " << total_count;
   LOG_TRACE() << "total count: " << total_count;
}
// NodeCenter::ProcState
void NodeCenter::ProcState::PutOffline(const int64_t offline_time)
void NodeCenter::NodeInfo::PutOffline(const int64_t offline_time)
{
   timestamp_ = NowSec() - offline_time;
   flag_ = kStateOffline;
   state_.timestamp_ = NowSec() - offline_time;
   state_.flag_ = kStateOffline;
   Json json;
   json.put("proc_id", proc_.proc_id());
   center_.Publish(kTopicNodeOffline, json.dump());
}
void NodeCenter::ProcState::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
{
   auto diff = now - timestamp_;
   LOG_DEBUG() << "state " << this << " diff: " << diff;
   auto old = state_.flag_;
   auto diff = now - state_.timestamp_;
   auto publish = [this](const std::string &topic) {
      if (proc_.proc_id().empty()) { return; } // node init, ignore.
      Json json;
      json.put("proc_id", proc_.proc_id());
      center_.Publish(topic, json.dump());
   };
   LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
   if (diff < offline_time) {
      flag_ = kStateNormal;
      state_.flag_ = kStateNormal;
      if (old != state_.flag_) {
         publish(kTopicNodeOnline);
      }
   } else if (diff < kill_time) {
      flag_ = kStateOffline;
      state_.flag_ = kStateOffline;
      if (old != state_.flag_) {
         publish(kTopicNodeOffline);
      }
   } else {
      flag_ = kStateKillme;
      state_.flag_ = kStateKillme;
   }
}
@@ -126,7 +159,7 @@
   auto UpdateRegInfo = [&](Node &node) {
      node->state_.timestamp_ = NowSec() - offline_time_;
      node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
      node->UpdateState(NowSec(), offline_time_, kill_time_);
      // create sockets.
      try {
@@ -149,7 +182,7 @@
             SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg);
   };
   Node node(new NodeInfo);
   Node node(new NodeInfo(*this));
   if (UpdateRegInfo(node) && PrepareProcInit(node)) {
      reply |= (node->addrs_[ssn] << 4);
      nodes_[ssn] = node;
@@ -271,33 +304,33 @@
      MQId ssn = head.ssn_id();
      // when node restart, ssn will change,
      // and old node will be removed after timeout.
      auto UpdateRegInfo = [&](Node &node) {
         node->proc_.Swap(msg.mutable_proc());
         node->state_.timestamp_ = head.timestamp();
         node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
      };
      auto pos = nodes_.find(ssn);
      if (pos == nodes_.end()) {
         return MakeReply(eInvalidInput, "invalid session.");
      }
      // update proc info
      Node &node = pos->second;
      UpdateRegInfo(node);
      LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")";
      // try to remove old session
      auto old = online_node_addr_map_.find(head.proc_id());
      if (old != online_node_addr_map_.end()) { // old session
         auto &old_ssn = old->second;
         if (old_ssn != ssn) {
            nodes_[old_ssn]->state_.PutOffline(offline_time_);
            nodes_[old_ssn]->PutOffline(offline_time_);
            LOG_DEBUG() << "put node (" << nodes_[old_ssn]->proc_.proc_id() << ") ssn (" << old->second << ") offline";
            old_ssn = ssn;
         }
      } else {
         online_node_addr_map_.emplace(head.proc_id(), ssn);
      }
      // update proc info
      Node &node = pos->second;
      node->proc_.Swap(msg.mutable_proc());
      node->state_.timestamp_ = head.timestamp();
      node->UpdateState(NowSec(), offline_time_, kill_time_);
      LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")";
      return MakeReply(eSuccess);
   } catch (...) {
      return MakeReply(eError, "register node error.");
@@ -309,7 +342,7 @@
   return HandleMsg(
       head, [&](Node node) -> MsgCommonReply {
          NodeInfo &ni = *node;
          ni.state_.PutOffline(offline_time_);
          ni.PutOffline(offline_time_);
          return MakeReply(eSuccess);
       });
}
@@ -338,7 +371,7 @@
   return HandleMsg(head, [&](Node node) {
      NodeInfo &ni = *node;
      ni.state_.timestamp_ = head.timestamp();
      ni.state_.UpdateState(NowSec(), offline_time_, kill_time_);
      ni.UpdateState(NowSec(), offline_time_, kill_time_);
      auto &info = msg.proc();
      if (!info.public_info().empty()) {
@@ -469,8 +502,8 @@
NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic)
{
   Clients dests;
   auto Find1 = [&](const std::string &t) {
      auto pos = subscribe_map_.find(topic);
   auto Find1 = [&](const std::string &exact) {
      auto pos = subscribe_map_.find(exact);
      if (pos != subscribe_map_.end()) {
         auto &clients = pos->second;
         for (auto &cli : clients) {
@@ -489,7 +522,7 @@
         // Find1(std::string()); // sub all.
         break;
      } else {
         Find1(topic.substr(0, pos));
         Find1(topic.substr(0, pos - 1));
      }
   }
   return dests;
@@ -521,7 +554,7 @@
   auto it = nodes_.begin();
   while (it != nodes_.end()) {
      auto &cli = *it->second;
      cli.state_.UpdateState(now, offline_time_, kill_time_);
      cli.UpdateState(now, offline_time_, kill_time_);
      if (cli.state_.flag_ == kStateKillme) {
         RemoveNode(it->second);
         it = nodes_.erase(it);
@@ -565,3 +598,35 @@
   node->addrs_.clear();
}
void NodeCenter::Publish(const Topic &topic, const std::string &content)
{
   try {
      // LOG_DEBUG() << "center publish: " << topic << ": " << content;
      Clients clients(DoFindClients(topic));
      if (clients.empty()) { return; }
      MsgPublish pub;
      pub.set_topic(topic);
      pub.set_data(content);
      BHMsgHead head(InitMsgHead(GetType(pub), id(), 0));
      MsgI msg;
      if (msg.Make(head, pub)) {
         DEFER1(msg.Release());
         RecordMsg(msg);
         auto &mq = GetCenterInfo(BHomeShm())->mq_sender_;
         ShmSocket sender(mq.offset_, BHomeShm(), mq.id_);
         for (auto &cli : clients) {
            auto node = cli.weak_node_.lock();
            if (node && node->state_.flag_ == kStateNormal) {
               sender.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
            }
         }
      }
   } catch (...) {
      LOG_ERROR() << "center publish error.";
   }
}