| | |
| | | |
| | | namespace |
| | | { |
| | | typedef steady_clock::time_point TimePoint; |
| | | typedef steady_clock::duration Duration; |
| | | inline TimePoint Now() { return steady_clock::now(); }; |
| | | inline int64_t Seconds(const Duration &d) { return duration_cast<seconds>(d).count(); }; |
| | | |
| | | //TODO check proc_id |
| | | class NodeCenter |
| | |
| | | }; |
| | | |
| | | struct ProcState { |
| | | TimePoint timestamp_; |
| | | int64_t timestamp_; |
| | | uint32_t flag_ = 0; // reserved |
| | | void UpdateState(TimePoint now, const Duration &offline_time, const Duration &kill_time) |
| | | void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) |
| | | { |
| | | auto diff = now - timestamp_; |
| | | #ifndef NDEBUG |
| | | printf("diff: %ld\n", Seconds(diff)); |
| | | printf("state %p diff: %ld\n", this, diff); |
| | | #endif |
| | | if (diff < offline_time) { |
| | | flag_ = kStateNormal; |
| | |
| | | inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } |
| | | inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } |
| | | |
| | | NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) : |
| | | id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {} |
| | | |
| | | public: |
| | | typedef std::set<TopicDest> Clients; |
| | | |
| | | NodeCenter(const std::string &id, const Cleaner &cleaner, const Duration &offline_time, const Duration &kill_time) : |
| | | id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(Now()) {} |
| | | const std::string &id() const { return id_; } // no need to lock. |
| | | NodeCenter(const std::string &id, const Cleaner &cleaner, const steady_clock::duration offline_time, const steady_clock::duration kill_time) : |
| | | NodeCenter(id, cleaner, duration_cast<seconds>(offline_time).count(), duration_cast<seconds>(kill_time).count()) {} |
| | | |
| | | const std::string &id() const |
| | | { |
| | | return id_; |
| | | } // no need to lock. |
| | | |
| | | //TODO maybe just return serialized string. |
| | | MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) |
| | |
| | | node->addrs_.insert(addr.mq_id()); |
| | | } |
| | | node->proc_.Swap(msg.mutable_proc()); |
| | | node->state_.timestamp_ = Now(); |
| | | node->state_.flag_ = kStateNormal; |
| | | node->state_.timestamp_ = head.timestamp(); |
| | | node->state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | nodes_[node->proc_.proc_id()] = node; |
| | | return MakeReply(eSuccess); |
| | | } catch (...) { |
| | |
| | | { |
| | | return HandleMsg(head, [&](Node node) { |
| | | NodeInfo &ni = *node; |
| | | auto now = Now(); |
| | | ni.state_.timestamp_ = now; |
| | | ni.state_.flag_ = kStateNormal; |
| | | ni.state_.timestamp_ = head.timestamp(); |
| | | ni.state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | |
| | | auto &info = msg.proc(); |
| | | if (!info.public_info().empty()) { |
| | |
| | | private: |
| | | void CheckNodes() |
| | | { |
| | | auto now = Now(); |
| | | if (Seconds(now - last_check_time_) < 1) { return; } |
| | | |
| | | auto now = NowSec(); |
| | | if (now - last_check_time_ < 1) { return; } |
| | | last_check_time_ = now; |
| | | |
| | | auto it = nodes_.begin(); |
| | |
| | | std::unordered_map<Topic, Clients> subscribe_map_; |
| | | std::unordered_map<ProcId, Node> nodes_; |
| | | Cleaner cleaner_; // remove mqs. |
| | | Duration offline_time_; |
| | | Duration kill_time_; |
| | | TimePoint last_check_time_; |
| | | int64_t offline_time_; |
| | | int64_t kill_time_; |
| | | int64_t last_check_time_; |
| | | }; |
| | | |
| | | template <class Body, class OnMsg, class Replyer> |
| | |
| | | |
| | | bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) |
| | | { |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 3); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 5s, 10s); |
| | | auto center_failed_q = std::make_shared<FailedMsgQ>(); |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) { |
| | | return [&](auto &&rep_body) { |