lichao
2021-04-13 7f307880a58012077833061b5ff18ba63c1a2269
box/center.cpp
@@ -33,10 +33,6 @@
namespace
{
typedef steady_clock::time_point TimePoint;
typedef steady_clock::duration Duration;
inline TimePoint Now() { return steady_clock::now(); };
inline int64_t Seconds(const Duration &d) { return duration_cast<seconds>(d).count(); };
//TODO check proc_id
class NodeCenter
@@ -56,13 +52,13 @@
   };
   struct ProcState {
      TimePoint timestamp_;
      int64_t timestamp_;
      uint32_t flag_ = 0; // reserved
      void UpdateState(TimePoint now, const Duration &offline_time, const Duration &kill_time)
      void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
      {
         auto diff = now - timestamp_;
#ifndef NDEBUG
         printf("diff: %ld\n", Seconds(diff));
         printf("state %p diff: %ld\n", this, diff);
#endif
         if (diff < offline_time) {
            flag_ = kStateNormal;
@@ -93,12 +89,19 @@
   inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
   inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
   NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) :
       id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {}
public:
   typedef std::set<TopicDest> Clients;
   NodeCenter(const std::string &id, const Cleaner &cleaner, const Duration &offline_time, const Duration &kill_time) :
       id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(Now()) {}
   const std::string &id() const { return id_; } // no need to lock.
   NodeCenter(const std::string &id, const Cleaner &cleaner, const steady_clock::duration offline_time, const steady_clock::duration kill_time) :
       NodeCenter(id, cleaner, duration_cast<seconds>(offline_time).count(), duration_cast<seconds>(kill_time).count()) {}
   const std::string &id() const
   {
      return id_;
   } // no need to lock.
   //TODO maybe just return serialized string.
   MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
@@ -114,8 +117,8 @@
            node->addrs_.insert(addr.mq_id());
         }
         node->proc_.Swap(msg.mutable_proc());
         node->state_.timestamp_ = Now();
         node->state_.flag_ = kStateNormal;
         node->state_.timestamp_ = head.timestamp();
         node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
         nodes_[node->proc_.proc_id()] = node;
         return MakeReply(eSuccess);
      } catch (...) {
@@ -172,9 +175,8 @@
   {
      return HandleMsg(head, [&](Node node) {
         NodeInfo &ni = *node;
         auto now = Now();
         ni.state_.timestamp_ = now;
         ni.state_.flag_ = kStateNormal;
         ni.state_.timestamp_ = head.timestamp();
         ni.state_.UpdateState(NowSec(), offline_time_, kill_time_);
         auto &info = msg.proc();
         if (!info.public_info().empty()) {
@@ -307,9 +309,8 @@
private:
   void CheckNodes()
   {
      auto now = Now();
      if (Seconds(now - last_check_time_) < 1) { return; }
      auto now = NowSec();
      if (now - last_check_time_ < 1) { return; }
      last_check_time_ = now;
      auto it = nodes_.begin();
@@ -348,9 +349,9 @@
   std::unordered_map<Topic, Clients> subscribe_map_;
   std::unordered_map<ProcId, Node> nodes_;
   Cleaner cleaner_; // remove mqs.
   Duration offline_time_;
   Duration kill_time_;
   TimePoint last_check_time_;
   int64_t offline_time_;
   int64_t kill_time_;
   int64_t last_check_time_;
};
template <class Body, class OnMsg, class Replyer>
@@ -383,7 +384,7 @@
bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
{
   auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 3);
   auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 5s, 10s);
   auto center_failed_q = std::make_shared<FailedMsgQ>();
   auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) {
      return [&](auto &&rep_body) {