From bb9a7e348892eb5c4fccb063380aa6fcd9612b71 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 06 四月 2021 17:32:35 +0800 Subject: [PATCH] server resend failed; rename msgs; refactor. --- src/reqrep_center.cpp | 22 ++++++++-------------- 1 files changed, 8 insertions(+), 14 deletions(-) diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp index e52b0fd..ce35d1c 100644 --- a/src/reqrep_center.cpp +++ b/src/reqrep_center.cpp @@ -100,12 +100,6 @@ std::unordered_map<ProcId, Node> nodes_; }; -Synced<NodeCenter> &Center() -{ - static Synced<NodeCenter> s; - return s; -} - } // namespace BHCenter::MsgHandler MakeReqRepCenter() @@ -120,7 +114,7 @@ time_t now = 0; time(&now); if (last.exchange(now) < now) { - printf("bus queue size: %ld\n", socket.Pending()); + printf("center queue size: %ld\n", socket.Pending()); } #endif auto SrcMQ = [&]() { return msg.route(0).mq_id(); }; @@ -128,7 +122,7 @@ auto OnRegister = [&]() { if (msg.route_size() != 1) { return; } - DataProcRegister reg; + MsgRegister reg; if (reg.ParseFromString(msg.body()) && reg.has_proc()) { center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end()); } @@ -138,7 +132,7 @@ if (msg.route_size() != 1) { return; } auto &src_mq = msg.route(0).mq_id(); - DataProcHeartbeat hb; + MsgHeartbeat hb; if (hb.ParseFromString(msg.body()) && hb.has_proc()) { center->Heartbeat(*hb.mutable_proc(), SrcMQ()); } @@ -147,7 +141,7 @@ auto OnQueryTopic = [&]() { if (msg.route_size() != 1) { return; } - DataProcQueryTopic query; + MsgQueryTopic query; NodeCenter::ProcAddr dest; if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { MQId remote; @@ -161,9 +155,9 @@ }; switch (msg.type()) { - case kMsgTypeProcRegisterTopics: OnRegister(); return true; - case kMsgTypeProcHeartbeat: OnHeartbeat(); return true; - case kMsgTypeProcQueryTopic: OnQueryTopic(); return true; + case kMsgTypeRegister: OnRegister(); return true; + case kMsgTypeHeartbeat: OnHeartbeat(); return true; + case kMsgTypeQueryTopic: OnQueryTopic(); return true; default: return false; } }; @@ -176,4 +170,4 @@ const int kMaxWorker = 16; return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); -} \ No newline at end of file +} -- Gitblit v1.8.0