| | |
| | | struct ProcState { |
| | | int64_t timestamp_ = 0; |
| | | uint32_t flag_ = 0; // reserved |
| | | void PutOffline(const int64_t offline_time) |
| | | { |
| | | timestamp_ = NowSec() - offline_time; |
| | | flag_ = kStateOffline; |
| | | } |
| | | void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) |
| | | { |
| | | auto diff = now - timestamp_; |
| | |
| | | } |
| | | |
| | | try { |
| | | MQId ssn = head.ssn_id(); |
| | | // use src_addr as session id. |
| | | // when node restart, src_addr will change, |
| | | // and old node will be removed after timeout. |
| | | auto UpdateRegInfo = [&](Node &node) { |
| | | node->addrs_.insert(SrcAddr(head)); |
| | | for (auto &addr : msg.addrs()) { |
| | |
| | | node->state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | }; |
| | | |
| | | auto pos = nodes_.find(head.proc_id()); |
| | | if (pos != nodes_.end()) { // new client |
| | | auto pos = nodes_.find(ssn); |
| | | if (pos != nodes_.end()) { // update |
| | | Node &node = pos->second; |
| | | if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) { |
| | | // node restarted, release old mq. |
| | | RemoveNode(node); |
| | | node.reset(new NodeInfo); |
| | | } |
| | | UpdateRegInfo(node); |
| | | } else { |
| | | Node node(new NodeInfo); |
| | | UpdateRegInfo(node); |
| | | nodes_[node->proc_.proc_id()] = node; |
| | | nodes_[ssn] = node; |
| | | |
| | | auto old = node_addr_map_.find(head.proc_id()); |
| | | if (old != node_addr_map_.end()) { // old session |
| | | auto &old_ssn = old->second; |
| | | nodes_[old_ssn]->state_.PutOffline(offline_time_); |
| | | printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second); |
| | | old_ssn = ssn; |
| | | } else { |
| | | node_addr_map_.emplace(head.proc_id(), ssn); |
| | | } |
| | | } |
| | | return MakeReply(eSuccess); |
| | | } catch (...) { |
| | |
| | | Reply HandleMsg(const BHMsgHead &head, Func const &op) |
| | | { |
| | | try { |
| | | auto pos = nodes_.find(head.proc_id()); |
| | | auto pos = nodes_.find(head.ssn_id()); |
| | | if (pos == nodes_.end()) { |
| | | return MakeReply<Reply>(eNotRegistered, "Node is not registered."); |
| | | } else { |
| | |
| | | return HandleMsg( |
| | | head, [&](Node node) -> MsgCommonReply { |
| | | NodeInfo &ni = *node; |
| | | auto now = NowSec(); // just set to offline. |
| | | ni.state_.timestamp_ = now - offline_time_; |
| | | ni.state_.UpdateState(now, offline_time_, kill_time_); |
| | | ni.state_.PutOffline(offline_time_); |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | |
| | | }; |
| | | EraseMapRec(service_map_, node->services_); |
| | | EraseMapRec(subscribe_map_, node->subscriptions_); |
| | | node_addr_map_.erase(node->proc_.proc_id()); |
| | | |
| | | for (auto &addr : node->addrs_) { |
| | | cleaner_(addr); |
| | |
| | | |
| | | std::unordered_map<Topic, Clients> service_map_; |
| | | std::unordered_map<Topic, Clients> subscribe_map_; |
| | | std::unordered_map<ProcId, Node> nodes_; |
| | | std::unordered_map<Address, Node> nodes_; |
| | | std::unordered_map<std::string, Address> node_addr_map_; |
| | | Cleaner cleaner_; // remove mqs. |
| | | int64_t offline_time_; |
| | | int64_t kill_time_; |
| | |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2); |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); |
| | | auto remote = head.route(0).mq_id(); |
| | | socket.Send(remote, reply_head, rep_body); |
| | | }; |