From 83085f2ce99cca05d40a19482151873a55e6393a Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 02 四月 2021 19:32:21 +0800 Subject: [PATCH] refactor center; add async request no cb. --- src/reqrep_center.cpp | 54 ++++++++++++++++++++++++++++++++++++------------------ 1 files changed, 36 insertions(+), 18 deletions(-) diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp index 2356ebc..e52b0fd 100644 --- a/src/reqrep_center.cpp +++ b/src/reqrep_center.cpp @@ -99,63 +99,81 @@ std::unordered_map<Topic, WeakNode> topic_map_; std::unordered_map<ProcId, Node> nodes_; }; + +Synced<NodeCenter> &Center() +{ + static Synced<NodeCenter> s; + return s; +} + } // namespace -bool ReqRepCenter::Start(const int nworker) +BHCenter::MsgHandler MakeReqRepCenter() { auto center_ptr = std::make_shared<Synced<NodeCenter>>(); - auto onRecv = [center_ptr, this](BHMsg &msg) { + return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { auto ¢er = *center_ptr; + auto &shm = socket.shm(); #ifndef NDEBUG static std::atomic<time_t> last(0); time_t now = 0; time(&now); if (last.exchange(now) < now) { - printf("bus queue size: %ld\n", socket_.Pending()); + printf("bus queue size: %ld\n", socket.Pending()); } #endif - if (msg.route_size() == 0) { - return; - } - auto &src_mq = msg.route(0).mq_id(); + auto SrcMQ = [&]() { return msg.route(0).mq_id(); }; auto OnRegister = [&]() { + if (msg.route_size() != 1) { return; } + DataProcRegister reg; if (reg.ParseFromString(msg.body()) && reg.has_proc()) { - center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end()); + center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end()); } }; auto OnHeartbeat = [&]() { + if (msg.route_size() != 1) { return; } + auto &src_mq = msg.route(0).mq_id(); + DataProcHeartbeat hb; if (hb.ParseFromString(msg.body()) && hb.has_proc()) { - center->Heartbeat(*hb.mutable_proc(), src_mq); + center->Heartbeat(*hb.mutable_proc(), SrcMQ()); } }; auto OnQueryTopic = [&]() { + if (msg.route_size() != 1) { return; } + DataProcQueryTopic query; NodeCenter::ProcAddr dest; if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { MQId remote; - memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote)); + memcpy(&remote, SrcMQ().data(), sizeof(MQId)); MsgI imsg; - if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; } - if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) { - imsg.Release(shm()); + if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; } + if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) { + imsg.Release(shm); } } }; switch (msg.type()) { - case kMsgTypeProcRegisterTopics: OnRegister(); break; - case kMsgTypeProcHeartbeat: OnHeartbeat(); break; - case kMsgTypeProcQueryTopic: OnQueryTopic(); break; - default: break; + case kMsgTypeProcRegisterTopics: OnRegister(); return true; + case kMsgTypeProcHeartbeat: OnHeartbeat(); return true; + case kMsgTypeProcQueryTopic: OnQueryTopic(); return true; + default: return false; } }; +} + +bool ReqRepCenter::Start(const int nworker) +{ + auto handler = MakeReqRepCenter(); + printf("sizeof(rep/req handler) = %ld\n", sizeof(handler)); const int kMaxWorker = 16; - return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); + return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); } \ No newline at end of file -- Gitblit v1.8.0