From 4deeafbd502dc3c57dab8ad6ca601a38a9e7f074 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 06 四月 2021 19:10:49 +0800 Subject: [PATCH] add uni center. --- src/reqrep_center.cpp | 56 ++++++++++++++++++++++++++++++++++---------------------- 1 files changed, 34 insertions(+), 22 deletions(-) diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp index 2356ebc..ce35d1c 100644 --- a/src/reqrep_center.cpp +++ b/src/reqrep_center.cpp @@ -99,63 +99,75 @@ std::unordered_map<Topic, WeakNode> topic_map_; std::unordered_map<ProcId, Node> nodes_; }; + } // namespace -bool ReqRepCenter::Start(const int nworker) +BHCenter::MsgHandler MakeReqRepCenter() { auto center_ptr = std::make_shared<Synced<NodeCenter>>(); - auto onRecv = [center_ptr, this](BHMsg &msg) { + return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { auto ¢er = *center_ptr; + auto &shm = socket.shm(); #ifndef NDEBUG static std::atomic<time_t> last(0); time_t now = 0; time(&now); if (last.exchange(now) < now) { - printf("bus queue size: %ld\n", socket_.Pending()); + printf("center queue size: %ld\n", socket.Pending()); } #endif - if (msg.route_size() == 0) { - return; - } - auto &src_mq = msg.route(0).mq_id(); + auto SrcMQ = [&]() { return msg.route(0).mq_id(); }; auto OnRegister = [&]() { - DataProcRegister reg; + if (msg.route_size() != 1) { return; } + + MsgRegister reg; if (reg.ParseFromString(msg.body()) && reg.has_proc()) { - center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end()); + center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end()); } }; auto OnHeartbeat = [&]() { - DataProcHeartbeat hb; + if (msg.route_size() != 1) { return; } + auto &src_mq = msg.route(0).mq_id(); + + MsgHeartbeat hb; if (hb.ParseFromString(msg.body()) && hb.has_proc()) { - center->Heartbeat(*hb.mutable_proc(), src_mq); + center->Heartbeat(*hb.mutable_proc(), SrcMQ()); } }; auto OnQueryTopic = [&]() { - DataProcQueryTopic query; + if (msg.route_size() != 1) { return; } + + MsgQueryTopic query; NodeCenter::ProcAddr dest; if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { MQId remote; - memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote)); + memcpy(&remote, SrcMQ().data(), sizeof(MQId)); MsgI imsg; - if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; } - if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) { - imsg.Release(shm()); + if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; } + if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) { + imsg.Release(shm); } } }; switch (msg.type()) { - case kMsgTypeProcRegisterTopics: OnRegister(); break; - case kMsgTypeProcHeartbeat: OnHeartbeat(); break; - case kMsgTypeProcQueryTopic: OnQueryTopic(); break; - default: break; + case kMsgTypeRegister: OnRegister(); return true; + case kMsgTypeHeartbeat: OnHeartbeat(); return true; + case kMsgTypeQueryTopic: OnQueryTopic(); return true; + default: return false; } }; +} + +bool ReqRepCenter::Start(const int nworker) +{ + auto handler = MakeReqRepCenter(); + printf("sizeof(rep/req handler) = %ld\n", sizeof(handler)); const int kMaxWorker = 16; - return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); -} \ No newline at end of file + return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); +} -- Gitblit v1.8.0