lichao
2021-04-13 65ef4d68321e56906920be75831b5e968f7abd7b
src/center.cpp
@@ -34,7 +34,9 @@
namespace
{
typedef steady_clock::time_point TimePoint;
typedef steady_clock::duration Duration;
inline TimePoint Now() { return steady_clock::now(); };
inline int64_t Seconds(const Duration &d) { return duration_cast<seconds>(d).count(); };
//TODO check proc_id
class NodeCenter
@@ -56,15 +58,15 @@
   struct ProcState {
      TimePoint timestamp_;
      uint32_t flag_ = 0; // reserved
      void UpdateState(TimePoint now)
      void UpdateState(TimePoint now, const Duration &offline_time, const Duration &kill_time)
      {
         const auto kOfflineTime = 60 * 10s;
         const auto kKillTime = 60 * 20s;
         auto diff = now - timestamp_;
         if (diff < kOfflineTime) {
#ifndef NDEBUG
         printf("diff: %ld\n", Seconds(diff));
#endif
         if (diff < offline_time) {
            flag_ = kStateNormal;
         } else if (diff < kKillTime) {
         } else if (diff < kill_time) {
            flag_ = kStateOffline;
         } else {
            flag_ = kStateKillme;
@@ -94,8 +96,8 @@
public:
   typedef std::set<TopicDest> Clients;
   NodeCenter(const std::string &id, const Cleaner &cleaner) :
       id_(id), cleaner_(cleaner) {}
   NodeCenter(const std::string &id, const Cleaner &cleaner, const Duration &offline_time, const Duration &kill_time) :
       id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(Now()) {}
   const std::string &id() const { return id_; } // no need to lock.
   //TODO maybe just return serialized string.
@@ -132,6 +134,8 @@
            auto node = pos->second;
            if (!MatchAddr(node->addrs_, SrcAddr(head))) {
               return MakeReply<Reply>(eAddressNotMatch, "Node address error.");
            } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) {
               return op(node);
            } else if (!Valid(*node)) {
               return MakeReply<Reply>(eNoRespond, "Node is not alive.");
            } else {
@@ -168,7 +172,9 @@
   {
      return HandleMsg(head, [&](Node node) {
         NodeInfo &ni = *node;
         ni.state_.timestamp_ = Now();
         auto now = Now();
         ni.state_.timestamp_ = now;
         ni.state_.flag_ = kStateNormal;
         auto &info = msg.proc();
         if (!info.public_info().empty()) {
@@ -301,10 +307,15 @@
private:
   void CheckNodes()
   {
      auto now = Now();
      if (Seconds(now - last_check_time_) < 1) { return; }
      last_check_time_ = now;
      auto it = nodes_.begin();
      while (it != nodes_.end()) {
         auto &cli = *it->second;
         cli.state_.UpdateState(Now());
         cli.state_.UpdateState(now, offline_time_, kill_time_);
         if (cli.state_.flag_ == kStateKillme) {
            if (cleaner_) {
               for (auto &addr : cli.addrs_) {
@@ -316,6 +327,10 @@
            ++it;
         }
      }
   }
   bool CanHeartbeat(const NodeInfo &node)
   {
      return Valid(node) || node.state_.flag_ == kStateOffline;
   }
   bool Valid(const NodeInfo &node)
   {
@@ -333,6 +348,9 @@
   std::unordered_map<Topic, Clients> subscribe_map_;
   std::unordered_map<ProcId, Node> nodes_;
   Cleaner cleaner_; // remove mqs.
   Duration offline_time_;
   Duration kill_time_;
   TimePoint last_check_time_;
};
template <class Body, class OnMsg, class Replyer>
@@ -365,7 +383,7 @@
bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
{
   auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner);
   auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 3);
   auto center_failed_q = std::make_shared<FailedMsgQ>();
   auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) {
      return [&](auto &&rep_body) {