/* * ===================================================================================== * * Filename: reqrep_center.cpp * * Description: topic request/reply center * * Version: 1.0 * Created: 2021年04月01日 14时08分50秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "reqrep_center.h" #include "bh_util.h" #include "msg.h" #include #include #include #include using namespace bhome_shm; namespace { auto Now = []() { time_t t; return time(&t); }; class NodeCenter { public: typedef std::string ProcAddr; typedef bhome::msg::ProcInfo ProcInfo; template bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end) { try { Node node(new NodeInfo); node->addr_ = src_mq; node->proc_.Swap(&info); node->state_.timestamp_ = Now(); nodes_[node->proc_.id()] = node; for (auto it = topics_begin; it != topics_end; ++it) { topic_map_[*it] = node; } return true; } catch (...) { return false; } } void Heartbeat(ProcInfo &info, const ProcAddr &src_mq) { auto pos = nodes_.find(info.name()); if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same. NodeInfo &ni = *pos->second; ni.state_.timestamp_ = Now(); if (!info.public_info().empty()) { ni.proc_.set_public_info(info.public_info()); } if (!info.private_info().empty()) { ni.proc_.set_private_info(info.private_info()); } } } bool QueryTopic(const Topic &topic, ProcAddr &addr) { auto pos = topic_map_.find(topic); if (pos != topic_map_.end()) { Node node(pos->second.lock()); if (node) { addr = node->addr_; return true; } else { // dead, remove record. topic_map_.erase(pos); return false; } } else { return false; } } private: struct ProcState { time_t timestamp_ = 0; uint32_t flag_ = 0; // reserved }; typedef std::string ProcId; struct NodeInfo { ProcState state_; // state ProcAddr addr_; // registered_mqid. ProcInfo proc_; // }; typedef std::shared_ptr Node; typedef std::weak_ptr WeakNode; std::unordered_map topic_map_; std::unordered_map nodes_; }; } // namespace BHCenter::MsgHandler MakeReqRepCenter() { auto center_ptr = std::make_shared>(); return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { auto ¢er = *center_ptr; auto &shm = socket.shm(); #ifndef NDEBUG static std::atomic last(0); time_t now = 0; time(&now); if (last.exchange(now) < now) { printf("center queue size: %ld\n", socket.Pending()); } #endif auto SrcMQ = [&]() { return msg.route(0).mq_id(); }; auto OnRegister = [&]() { if (msg.route_size() != 1) { return; } MsgRegister reg; if (reg.ParseFromString(msg.body()) && reg.has_proc()) { center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end()); } }; auto OnHeartbeat = [&]() { if (msg.route_size() != 1) { return; } auto &src_mq = msg.route(0).mq_id(); MsgHeartbeat hb; if (hb.ParseFromString(msg.body()) && hb.has_proc()) { center->Heartbeat(*hb.mutable_proc(), SrcMQ()); } }; auto OnQueryTopic = [&]() { if (msg.route_size() != 1) { return; } MsgQueryTopic query; NodeCenter::ProcAddr dest; if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { MQId remote; memcpy(&remote, SrcMQ().data(), sizeof(MQId)); MsgI imsg; if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; } if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) { imsg.Release(shm); } } }; switch (msg.type()) { case kMsgTypeRegister: OnRegister(); return true; case kMsgTypeHeartbeat: OnHeartbeat(); return true; case kMsgTypeQueryTopic: OnQueryTopic(); return true; default: return false; } }; } bool ReqRepCenter::Start(const int nworker) { auto handler = MakeReqRepCenter(); printf("sizeof(rep/req handler) = %ld\n", sizeof(handler)); const int kMaxWorker = 16; return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); }