| | |
| | | namespace |
| | | { |
| | | typedef steady_clock::time_point TimePoint; |
| | | typedef steady_clock::duration Duration; |
| | | inline TimePoint Now() { return steady_clock::now(); }; |
| | | inline int64_t Seconds(const Duration &d) { return duration_cast<seconds>(d).count(); }; |
| | | |
| | | //TODO check proc_id |
| | | class NodeCenter |
| | |
| | | struct ProcState { |
| | | TimePoint timestamp_; |
| | | uint32_t flag_ = 0; // reserved |
| | | void UpdateState(TimePoint now) |
| | | void UpdateState(TimePoint now, const Duration &offline_time, const Duration &kill_time) |
| | | { |
| | | const auto kOfflineTime = 60 * 10s; |
| | | const auto kKillTime = 60 * 20s; |
| | | |
| | | auto diff = now - timestamp_; |
| | | if (diff < kOfflineTime) { |
| | | #ifndef NDEBUG |
| | | printf("diff: %ld\n", Seconds(diff)); |
| | | #endif |
| | | if (diff < offline_time) { |
| | | flag_ = kStateNormal; |
| | | } else if (diff < kKillTime) { |
| | | } else if (diff < kill_time) { |
| | | flag_ = kStateOffline; |
| | | } else { |
| | | flag_ = kStateKillme; |
| | |
| | | public: |
| | | typedef std::set<TopicDest> Clients; |
| | | |
| | | NodeCenter(const std::string &id, const Cleaner &cleaner) : |
| | | id_(id), cleaner_(cleaner) {} |
| | | NodeCenter(const std::string &id, const Cleaner &cleaner, const Duration &offline_time, const Duration &kill_time) : |
| | | id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(Now()) {} |
| | | const std::string &id() const { return id_; } // no need to lock. |
| | | |
| | | //TODO maybe just return serialized string. |
| | |
| | | auto node = pos->second; |
| | | if (!MatchAddr(node->addrs_, SrcAddr(head))) { |
| | | return MakeReply<Reply>(eAddressNotMatch, "Node address error."); |
| | | } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) { |
| | | return op(node); |
| | | } else if (!Valid(*node)) { |
| | | return MakeReply<Reply>(eNoRespond, "Node is not alive."); |
| | | } else { |
| | |
| | | { |
| | | return HandleMsg(head, [&](Node node) { |
| | | NodeInfo &ni = *node; |
| | | ni.state_.timestamp_ = Now(); |
| | | auto now = Now(); |
| | | ni.state_.timestamp_ = now; |
| | | ni.state_.flag_ = kStateNormal; |
| | | |
| | | auto &info = msg.proc(); |
| | | if (!info.public_info().empty()) { |
| | |
| | | private: |
| | | void CheckNodes() |
| | | { |
| | | auto now = Now(); |
| | | if (Seconds(now - last_check_time_) < 1) { return; } |
| | | |
| | | last_check_time_ = now; |
| | | |
| | | auto it = nodes_.begin(); |
| | | while (it != nodes_.end()) { |
| | | auto &cli = *it->second; |
| | | cli.state_.UpdateState(Now()); |
| | | cli.state_.UpdateState(now, offline_time_, kill_time_); |
| | | if (cli.state_.flag_ == kStateKillme) { |
| | | if (cleaner_) { |
| | | for (auto &addr : cli.addrs_) { |
| | |
| | | ++it; |
| | | } |
| | | } |
| | | } |
| | | bool CanHeartbeat(const NodeInfo &node) |
| | | { |
| | | return Valid(node) || node.state_.flag_ == kStateOffline; |
| | | } |
| | | bool Valid(const NodeInfo &node) |
| | | { |
| | |
| | | std::unordered_map<Topic, Clients> subscribe_map_; |
| | | std::unordered_map<ProcId, Node> nodes_; |
| | | Cleaner cleaner_; // remove mqs. |
| | | Duration offline_time_; |
| | | Duration kill_time_; |
| | | TimePoint last_check_time_; |
| | | }; |
| | | |
| | | template <class Body, class OnMsg, class Replyer> |
| | |
| | | |
| | | bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) |
| | | { |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 3); |
| | | auto center_failed_q = std::make_shared<FailedMsgQ>(); |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) { |
| | | return [&](auto &&rep_body) { |