/* * ===================================================================================== * * Filename: reqrep_center.cpp * * Description: topic request/reply center * * Version: 1.0 * Created: 2021年04月01日 14时08分50秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "reqrep_center.h" #include "bh_util.h" #include "msg.h" #include #include #include #include using namespace bhome_shm; namespace { auto Now = []() { time_t t; return time(&t); }; class NodeCenter { public: typedef std::string ProcAddr; typedef bhome::msg::ProcInfo ProcInfo; template bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end) { try { Node node(new NodeInfo); node->addr_ = src_mq; node->proc_.Swap(&info); node->state_.timestamp_ = Now(); nodes_[node->proc_.id()] = node; for (auto it = topics_begin; it != topics_end; ++it) { topic_map_[*it] = node; } return true; } catch (...) { return false; } } void Heartbeat(ProcInfo &info, const ProcAddr &src_mq) { auto pos = nodes_.find(info.name()); if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same. NodeInfo &ni = *pos->second; ni.state_.timestamp_ = Now(); if (!info.public_info().empty()) { ni.proc_.set_public_info(info.public_info()); } if (!info.private_info().empty()) { ni.proc_.set_private_info(info.private_info()); } } } bool QueryTopic(const Topic &topic, ProcAddr &addr) { auto pos = topic_map_.find(topic); if (pos != topic_map_.end()) { Node node(pos->second.lock()); if (node) { addr = node->addr_; return true; } else { // dead, remove record. topic_map_.erase(pos); return false; } } else { return false; } } private: struct ProcState { time_t timestamp_ = 0; uint32_t flag_ = 0; // reserved }; typedef std::string ProcId; struct NodeInfo { ProcState state_; // state ProcAddr addr_; // registered_mqid. ProcInfo proc_; // }; typedef std::shared_ptr Node; typedef std::weak_ptr WeakNode; std::unordered_map topic_map_; std::unordered_map nodes_; }; } // namespace bool ReqRepCenter::Start(const int nworker) { auto center_ptr = std::make_shared>(); auto onRecv = [center_ptr, this](BHMsg &msg) { auto ¢er = *center_ptr; #ifndef NDEBUG static std::atomic last(0); time_t now = 0; time(&now); if (last.exchange(now) < now) { printf("bus queue size: %ld\n", socket_.Pending()); } #endif if (msg.route_size() == 0) { return; } auto &src_mq = msg.route(0).mq_id(); auto OnRegister = [&]() { DataProcRegister reg; if (reg.ParseFromString(msg.body()) && reg.has_proc()) { center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end()); } }; auto OnHeartbeat = [&]() { DataProcHeartbeat hb; if (hb.ParseFromString(msg.body()) && hb.has_proc()) { center->Heartbeat(*hb.mutable_proc(), src_mq); } }; auto OnQueryTopic = [&]() { DataProcQueryTopic query; NodeCenter::ProcAddr dest; if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { MQId remote; memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote)); MsgI imsg; if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; } if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) { imsg.Release(shm()); } } }; switch (msg.type()) { case kMsgTypeProcRegisterTopics: OnRegister(); break; case kMsgTypeProcHeartbeat: OnHeartbeat(); break; case kMsgTypeProcQueryTopic: OnQueryTopic(); break; default: break; } }; const int kMaxWorker = 16; return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); }