lichao
2021-04-25 5b6ced44157b6e7fab519ae48f5cffcdc2b3cd7c
box/center.cpp
@@ -52,6 +52,11 @@
   struct ProcState {
      int64_t timestamp_ = 0;
      uint32_t flag_ = 0; // reserved
      void PutOffline(const int64_t offline_time)
      {
         timestamp_ = NowSec() - offline_time;
         flag_ = kStateOffline;
      }
      void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
      {
         auto diff = now - timestamp_;
@@ -106,6 +111,10 @@
      }
      try {
         MQId ssn = head.ssn_id();
         // use src_addr as session id.
         // when node restart, src_addr will change,
         // and old node will be removed after timeout.
         auto UpdateRegInfo = [&](Node &node) {
            node->addrs_.insert(SrcAddr(head));
            for (auto &addr : msg.addrs()) {
@@ -116,19 +125,24 @@
            node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
         };
         auto pos = nodes_.find(head.proc_id());
         if (pos != nodes_.end()) { // new client
         auto pos = nodes_.find(ssn);
         if (pos != nodes_.end()) { // update
            Node &node = pos->second;
            if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) {
               // node restarted, release old mq.
               RemoveNode(node);
               node.reset(new NodeInfo);
            }
            UpdateRegInfo(node);
         } else {
            Node node(new NodeInfo);
            UpdateRegInfo(node);
            nodes_[node->proc_.proc_id()] = node;
            nodes_[ssn] = node;
            auto old = node_addr_map_.find(head.proc_id());
            if (old != node_addr_map_.end()) { // old session
               auto &old_ssn = old->second;
               nodes_[old_ssn]->state_.PutOffline(offline_time_);
               printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
               old_ssn = ssn;
            } else {
               node_addr_map_.emplace(head.proc_id(), ssn);
            }
         }
         return MakeReply(eSuccess);
      } catch (...) {
@@ -140,7 +154,7 @@
   Reply HandleMsg(const BHMsgHead &head, Func const &op)
   {
      try {
         auto pos = nodes_.find(head.proc_id());
         auto pos = nodes_.find(head.ssn_id());
         if (pos == nodes_.end()) {
            return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
         } else {
@@ -171,9 +185,7 @@
      return HandleMsg(
          head, [&](Node node) -> MsgCommonReply {
             NodeInfo &ni = *node;
             auto now = NowSec(); // just set to offline.
             ni.state_.timestamp_ = now - offline_time_;
             ni.state_.UpdateState(now, offline_time_, kill_time_);
             ni.state_.PutOffline(offline_time_);
             return MakeReply(eSuccess);
          });
   }
@@ -375,6 +387,7 @@
      };
      EraseMapRec(service_map_, node->services_);
      EraseMapRec(subscribe_map_, node->subscriptions_);
      node_addr_map_.erase(node->proc_.proc_id());
      for (auto &addr : node->addrs_) {
         cleaner_(addr);
@@ -385,7 +398,8 @@
   std::unordered_map<Topic, Clients> service_map_;
   std::unordered_map<Topic, Clients> subscribe_map_;
   std::unordered_map<ProcId, Node> nodes_;
   std::unordered_map<Address, Node> nodes_;
   std::unordered_map<std::string, Address> node_addr_map_;
   Cleaner cleaner_; // remove mqs.
   int64_t offline_time_;
   int64_t kill_time_;
@@ -425,7 +439,7 @@
   auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2);
   auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
      return [&](auto &&rep_body) {
         auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
         auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
         auto remote = head.route(0).mq_id();
         socket.Send(remote, reply_head, rep_body);
      };