From 4deeafbd502dc3c57dab8ad6ca601a38a9e7f074 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 06 四月 2021 19:10:49 +0800 Subject: [PATCH] add uni center. --- src/reqrep_center.cpp | 178 ++++++++++++++++++++++++++++++++++++++--------------------- 1 files changed, 115 insertions(+), 63 deletions(-) diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp index 5f1873e..ce35d1c 100644 --- a/src/reqrep_center.cpp +++ b/src/reqrep_center.cpp @@ -17,105 +17,157 @@ */ #include "reqrep_center.h" #include "bh_util.h" -using namespace bhome_shm; +#include "msg.h" +#include <chrono> +#include <memory> +#include <mutex> +#include <unordered_map> -struct A { - void F(int){}; -}; +using namespace bhome_shm; namespace { -inline uint64_t Now() +auto Now = []() { time_t t; return time(&t); }; + +class NodeCenter { - time_t t; - return time(&t); -} +public: + typedef std::string ProcAddr; + typedef bhome::msg::ProcInfo ProcInfo; + + template <class Iter> + bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end) + { + try { + Node node(new NodeInfo); + node->addr_ = src_mq; + node->proc_.Swap(&info); + node->state_.timestamp_ = Now(); + nodes_[node->proc_.id()] = node; + for (auto it = topics_begin; it != topics_end; ++it) { + topic_map_[*it] = node; + } + return true; + } catch (...) { + return false; + } + } + void Heartbeat(ProcInfo &info, const ProcAddr &src_mq) + { + auto pos = nodes_.find(info.name()); + if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same. + NodeInfo &ni = *pos->second; + ni.state_.timestamp_ = Now(); + if (!info.public_info().empty()) { + ni.proc_.set_public_info(info.public_info()); + } + if (!info.private_info().empty()) { + ni.proc_.set_private_info(info.private_info()); + } + } + } + bool QueryTopic(const Topic &topic, ProcAddr &addr) + { + auto pos = topic_map_.find(topic); + if (pos != topic_map_.end()) { + Node node(pos->second.lock()); + if (node) { + addr = node->addr_; + return true; + } else { // dead, remove record. + topic_map_.erase(pos); + return false; + } + } else { + return false; + } + } + +private: + struct ProcState { + time_t timestamp_ = 0; + uint32_t flag_ = 0; // reserved + }; + typedef std::string ProcId; + struct NodeInfo { + ProcState state_; // state + ProcAddr addr_; // registered_mqid. + ProcInfo proc_; // + }; + typedef std::shared_ptr<NodeInfo> Node; + typedef std::weak_ptr<NodeInfo> WeakNode; + std::unordered_map<Topic, WeakNode> topic_map_; + std::unordered_map<ProcId, Node> nodes_; +}; } // namespace -bool ReqRepCenter::Start(const int nworker) + +BHCenter::MsgHandler MakeReqRepCenter() { - auto onRecv = [&](BHMsg &msg) { + auto center_ptr = std::make_shared<Synced<NodeCenter>>(); + return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { + auto ¢er = *center_ptr; + auto &shm = socket.shm(); + #ifndef NDEBUG static std::atomic<time_t> last(0); time_t now = 0; time(&now); if (last.exchange(now) < now) { - printf("bus queue size: %ld\n", socket_.Pending()); + printf("center queue size: %ld\n", socket.Pending()); } #endif - if (msg.route_size() == 0) { - return; - } - auto &src_mq = msg.route(0).mq_id(); + auto SrcMQ = [&]() { return msg.route(0).mq_id(); }; auto OnRegister = [&]() { - DataProcRegister reg; - if (!reg.ParseFromString(msg.body())) { - return; - } - ProcInfo pi; - pi.server_mqid_ = src_mq; - pi.proc_id_ = reg.proc().name(); - pi.ext_info_ = reg.proc().info(); - pi.timestamp_ = Now(); + if (msg.route_size() != 1) { return; } - std::lock_guard<std::mutex> lock(mutex_); - for (auto &t : reg.topics()) { - topic_mq_[t] = pi.server_mqid_; + MsgRegister reg; + if (reg.ParseFromString(msg.body()) && reg.has_proc()) { + center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end()); } - procs_[pi.proc_id_] = pi; }; auto OnHeartbeat = [&]() { - DataProcHeartbeat hb; - if (!hb.ParseFromString(msg.body())) { - return; - } + if (msg.route_size() != 1) { return; } + auto &src_mq = msg.route(0).mq_id(); - std::lock_guard<std::mutex> lock(mutex_); - auto pos = procs_.find(hb.proc().name()); - if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same. - pos->second.timestamp_ = Now(); - pos->second.ext_info_ = hb.proc().info(); + MsgHeartbeat hb; + if (hb.ParseFromString(msg.body()) && hb.has_proc()) { + center->Heartbeat(*hb.mutable_proc(), SrcMQ()); } }; auto OnQueryTopic = [&]() { - DataProcQueryTopic query; - if (!query.ParseFromString(msg.body())) { - return; - } + if (msg.route_size() != 1) { return; } - std::string dest; - auto FindDest = [&]() { - std::lock_guard<std::mutex> lock(mutex_); - auto pos = topic_mq_.find(query.topic()); - if (pos != topic_mq_.end()) { - dest = pos->second; - return true; - } else { - return false; - } - }; - if (FindDest()) { + MsgQueryTopic query; + NodeCenter::ProcAddr dest; + if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { MQId remote; - memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote)); + memcpy(&remote, SrcMQ().data(), sizeof(MQId)); MsgI imsg; - if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; } - if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) { - imsg.Release(shm()); + if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; } + if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) { + imsg.Release(shm); } } }; switch (msg.type()) { - case kMsgTypeProcRegisterTopics: OnRegister(); break; - case kMsgTypeProcHeartbeat: OnHeartbeat(); break; - case kMsgTypeProcQueryTopic: OnQueryTopic(); break; - default: break; + case kMsgTypeRegister: OnRegister(); return true; + case kMsgTypeHeartbeat: OnHeartbeat(); return true; + case kMsgTypeQueryTopic: OnQueryTopic(); return true; + default: return false; } }; +} + +bool ReqRepCenter::Start(const int nworker) +{ + auto handler = MakeReqRepCenter(); + printf("sizeof(rep/req handler) = %ld\n", sizeof(handler)); const int kMaxWorker = 16; - return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); -} \ No newline at end of file + return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); +} -- Gitblit v1.8.0