lichao
2021-04-06 bb9a7e348892eb5c4fccb063380aa6fcd9612b71
src/reqrep_center.cpp
@@ -100,12 +100,6 @@
   std::unordered_map<ProcId, Node> nodes_;
};
Synced<NodeCenter> &Center()
{
   static Synced<NodeCenter> s;
   return s;
}
} // namespace
BHCenter::MsgHandler MakeReqRepCenter()
@@ -120,7 +114,7 @@
      time_t now = 0;
      time(&now);
      if (last.exchange(now) < now) {
         printf("bus queue size: %ld\n", socket.Pending());
         printf("center queue size: %ld\n", socket.Pending());
      }
#endif
      auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
@@ -128,7 +122,7 @@
      auto OnRegister = [&]() {
         if (msg.route_size() != 1) { return; }
         DataProcRegister reg;
         MsgRegister reg;
         if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
            center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
         }
@@ -138,7 +132,7 @@
         if (msg.route_size() != 1) { return; }
         auto &src_mq = msg.route(0).mq_id();
         DataProcHeartbeat hb;
         MsgHeartbeat hb;
         if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
            center->Heartbeat(*hb.mutable_proc(), SrcMQ());
         }
@@ -147,7 +141,7 @@
      auto OnQueryTopic = [&]() {
         if (msg.route_size() != 1) { return; }
         DataProcQueryTopic query;
         MsgQueryTopic query;
         NodeCenter::ProcAddr dest;
         if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
            MQId remote;
@@ -161,9 +155,9 @@
      };
      switch (msg.type()) {
      case kMsgTypeProcRegisterTopics: OnRegister(); return true;
      case kMsgTypeProcHeartbeat: OnHeartbeat(); return true;
      case kMsgTypeProcQueryTopic: OnQueryTopic(); return true;
      case kMsgTypeRegister: OnRegister(); return true;
      case kMsgTypeHeartbeat: OnHeartbeat(); return true;
      case kMsgTypeQueryTopic: OnQueryTopic(); return true;
      default: return false;
      }
   };
@@ -176,4 +170,4 @@
   const int kMaxWorker = 16;
   return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}
}