lichao
2021-04-26 150e192a7ddf19783a0f4a81b87d270c0812277a
box/center.cpp
@@ -112,8 +112,7 @@
      try {
         MQId ssn = head.ssn_id();
         // use src_addr as session id.
         // when node restart, src_addr will change,
         // when node restart, ssn will change,
         // and old node will be removed after timeout.
         auto UpdateRegInfo = [&](Node &node) {
            node->addrs_.insert(SrcAddr(head));
@@ -134,14 +133,15 @@
            UpdateRegInfo(node);
            nodes_[ssn] = node;
            auto old = node_addr_map_.find(head.proc_id());
            if (old != node_addr_map_.end()) { // old session
            printf("new ssn %ld\n", ssn);
            auto old = online_node_addr_map_.find(head.proc_id());
            if (old != online_node_addr_map_.end()) { // old session
               auto &old_ssn = old->second;
               nodes_[old_ssn]->state_.PutOffline(offline_time_);
               printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
               old_ssn = ssn;
            } else {
               node_addr_map_.emplace(head.proc_id(), ssn);
               online_node_addr_map_.emplace(head.proc_id(), ssn);
            }
         }
         return MakeReply(eSuccess);
@@ -387,7 +387,6 @@
      };
      EraseMapRec(service_map_, node->services_);
      EraseMapRec(subscribe_map_, node->subscriptions_);
      node_addr_map_.erase(node->proc_.proc_id());
      for (auto &addr : node->addrs_) {
         cleaner_(addr);
@@ -399,7 +398,7 @@
   std::unordered_map<Topic, Clients> service_map_;
   std::unordered_map<Topic, Clients> subscribe_map_;
   std::unordered_map<Address, Node> nodes_;
   std::unordered_map<std::string, Address> node_addr_map_;
   std::unordered_map<std::string, Address> online_node_addr_map_;
   Cleaner cleaner_; // remove mqs.
   int64_t offline_time_;
   int64_t kill_time_;