| | |
| | | */ |
| | | #include "reqrep_center.h" |
| | | #include "bh_util.h" |
| | | using namespace bhome_shm; |
| | | #include "msg.h" |
| | | #include <chrono> |
| | | #include <memory> |
| | | #include <mutex> |
| | | #include <unordered_map> |
| | | |
| | | struct A { |
| | | void F(int){}; |
| | | }; |
| | | using namespace bhome_shm; |
| | | |
| | | namespace |
| | | { |
| | | inline uint64_t Now() |
| | | { |
| | | time_t t; |
| | | return time(&t); |
| | | } |
| | | auto Now = []() { time_t t; return time(&t); }; |
| | | |
| | | class NodeCenter |
| | | { |
| | | public: |
| | | typedef std::string ProcAddr; |
| | | typedef bhome::msg::ProcInfo ProcInfo; |
| | | |
| | | template <class Iter> |
| | | bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end) |
| | | { |
| | | try { |
| | | Node node(new NodeInfo); |
| | | node->addr_ = src_mq; |
| | | node->proc_.Swap(&info); |
| | | node->state_.timestamp_ = Now(); |
| | | nodes_[node->proc_.id()] = node; |
| | | for (auto it = topics_begin; it != topics_end; ++it) { |
| | | topic_map_[*it] = node; |
| | | } |
| | | return true; |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | void Heartbeat(ProcInfo &info, const ProcAddr &src_mq) |
| | | { |
| | | auto pos = nodes_.find(info.name()); |
| | | if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same. |
| | | NodeInfo &ni = *pos->second; |
| | | ni.state_.timestamp_ = Now(); |
| | | if (!info.public_info().empty()) { |
| | | ni.proc_.set_public_info(info.public_info()); |
| | | } |
| | | if (!info.private_info().empty()) { |
| | | ni.proc_.set_private_info(info.private_info()); |
| | | } |
| | | } |
| | | } |
| | | bool QueryTopic(const std::string &topic, ProcAddr &addr) |
| | | { |
| | | auto pos = topic_map_.find(topic); |
| | | if (pos != topic_map_.end()) { |
| | | Node node(pos->second.lock()); |
| | | if (node) { |
| | | addr = node->addr_; |
| | | return true; |
| | | } else { // dead, remove record. |
| | | topic_map_.erase(pos); |
| | | return false; |
| | | } |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | private: |
| | | struct ProcState { |
| | | time_t timestamp_ = 0; |
| | | uint32_t flag_ = 0; // reserved |
| | | }; |
| | | typedef std::string ProcId; |
| | | struct NodeInfo { |
| | | ProcState state_; // state |
| | | ProcAddr addr_; // registered_mqid. |
| | | ProcInfo proc_; // |
| | | }; |
| | | typedef std::shared_ptr<NodeInfo> Node; |
| | | typedef std::weak_ptr<NodeInfo> WeakNode; |
| | | std::unordered_map<std::string, WeakNode> topic_map_; |
| | | std::unordered_map<ProcId, Node> nodes_; |
| | | }; |
| | | } // namespace |
| | | |
| | | bool ReqRepCenter::Start(const int nworker) |
| | | { |
| | | auto onRecv = [&](BHMsg &msg) { |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(); |
| | | auto onRecv = [center_ptr, this](BHMsg &msg) { |
| | | auto ¢er = *center_ptr; |
| | | |
| | | #ifndef NDEBUG |
| | | static std::atomic<time_t> last(0); |
| | | time_t now = 0; |
| | |
| | | |
| | | auto OnRegister = [&]() { |
| | | DataProcRegister reg; |
| | | if (!reg.ParseFromString(msg.body())) { |
| | | return; |
| | | if (reg.ParseFromString(msg.body()) && reg.has_proc()) { |
| | | center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end()); |
| | | } |
| | | ProcInfo pi; |
| | | pi.server_mqid_ = src_mq; |
| | | pi.proc_id_ = reg.proc().name(); |
| | | pi.ext_info_ = reg.proc().info(); |
| | | pi.timestamp_ = Now(); |
| | | |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | for (auto &t : reg.topics()) { |
| | | topic_mq_[t] = pi.server_mqid_; |
| | | } |
| | | procs_[pi.proc_id_] = pi; |
| | | }; |
| | | |
| | | auto OnHeartbeat = [&]() { |
| | | DataProcHeartbeat hb; |
| | | if (!hb.ParseFromString(msg.body())) { |
| | | return; |
| | | } |
| | | |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | auto pos = procs_.find(hb.proc().name()); |
| | | if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same. |
| | | pos->second.timestamp_ = Now(); |
| | | pos->second.ext_info_ = hb.proc().info(); |
| | | if (hb.ParseFromString(msg.body()) && hb.has_proc()) { |
| | | center->Heartbeat(*hb.mutable_proc(), src_mq); |
| | | } |
| | | }; |
| | | |
| | | auto OnQueryTopic = [&]() { |
| | | DataProcQueryTopic query; |
| | | if (!query.ParseFromString(msg.body())) { |
| | | return; |
| | | } |
| | | |
| | | std::string dest; |
| | | auto FindDest = [&]() { |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | auto pos = topic_mq_.find(query.topic()); |
| | | if (pos != topic_mq_.end()) { |
| | | dest = pos->second; |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | }; |
| | | if (FindDest()) { |
| | | NodeCenter::ProcAddr dest; |
| | | if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { |
| | | MQId remote; |
| | | memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote)); |
| | | MsgI imsg; |