| | |
| | | |
| | | try { |
| | | MQId ssn = head.ssn_id(); |
| | | // use src_addr as session id. |
| | | // when node restart, src_addr will change, |
| | | // when node restart, ssn will change, |
| | | // and old node will be removed after timeout. |
| | | auto UpdateRegInfo = [&](Node &node) { |
| | | node->addrs_.insert(SrcAddr(head)); |
| | |
| | | UpdateRegInfo(node); |
| | | nodes_[ssn] = node; |
| | | |
| | | auto old = node_addr_map_.find(head.proc_id()); |
| | | if (old != node_addr_map_.end()) { // old session |
| | | printf("new ssn %ld\n", ssn); |
| | | auto old = online_node_addr_map_.find(head.proc_id()); |
| | | if (old != online_node_addr_map_.end()) { // old session |
| | | auto &old_ssn = old->second; |
| | | nodes_[old_ssn]->state_.PutOffline(offline_time_); |
| | | printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second); |
| | | old_ssn = ssn; |
| | | } else { |
| | | node_addr_map_.emplace(head.proc_id(), ssn); |
| | | online_node_addr_map_.emplace(head.proc_id(), ssn); |
| | | } |
| | | } |
| | | return MakeReply(eSuccess); |
| | |
| | | }; |
| | | EraseMapRec(service_map_, node->services_); |
| | | EraseMapRec(subscribe_map_, node->subscriptions_); |
| | | node_addr_map_.erase(node->proc_.proc_id()); |
| | | |
| | | for (auto &addr : node->addrs_) { |
| | | cleaner_(addr); |
| | |
| | | std::unordered_map<Topic, Clients> service_map_; |
| | | std::unordered_map<Topic, Clients> subscribe_map_; |
| | | std::unordered_map<Address, Node> nodes_; |
| | | std::unordered_map<std::string, Address> node_addr_map_; |
| | | std::unordered_map<std::string, Address> online_node_addr_map_; |
| | | Cleaner cleaner_; // remove mqs. |
| | | int64_t offline_time_; |
| | | int64_t kill_time_; |