| | |
| | | std::unordered_map<ProcId, Node> nodes_; |
| | | }; |
| | | |
| | | Synced<NodeCenter> &Center() |
| | | { |
| | | static Synced<NodeCenter> s; |
| | | return s; |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | BHCenter::MsgHandler MakeReqRepCenter() |
| | |
| | | time_t now = 0; |
| | | time(&now); |
| | | if (last.exchange(now) < now) { |
| | | printf("bus queue size: %ld\n", socket.Pending()); |
| | | printf("center queue size: %ld\n", socket.Pending()); |
| | | } |
| | | #endif |
| | | auto SrcMQ = [&]() { return msg.route(0).mq_id(); }; |
| | |
| | | auto OnRegister = [&]() { |
| | | if (msg.route_size() != 1) { return; } |
| | | |
| | | DataProcRegister reg; |
| | | MsgRegister reg; |
| | | if (reg.ParseFromString(msg.body()) && reg.has_proc()) { |
| | | center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end()); |
| | | } |
| | |
| | | if (msg.route_size() != 1) { return; } |
| | | auto &src_mq = msg.route(0).mq_id(); |
| | | |
| | | DataProcHeartbeat hb; |
| | | MsgHeartbeat hb; |
| | | if (hb.ParseFromString(msg.body()) && hb.has_proc()) { |
| | | center->Heartbeat(*hb.mutable_proc(), SrcMQ()); |
| | | } |
| | |
| | | auto OnQueryTopic = [&]() { |
| | | if (msg.route_size() != 1) { return; } |
| | | |
| | | DataProcQueryTopic query; |
| | | MsgQueryTopic query; |
| | | NodeCenter::ProcAddr dest; |
| | | if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { |
| | | MQId remote; |
| | |
| | | }; |
| | | |
| | | switch (msg.type()) { |
| | | case kMsgTypeProcRegisterTopics: OnRegister(); return true; |
| | | case kMsgTypeProcHeartbeat: OnHeartbeat(); return true; |
| | | case kMsgTypeProcQueryTopic: OnQueryTopic(); return true; |
| | | case kMsgTypeRegister: OnRegister(); return true; |
| | | case kMsgTypeHeartbeat: OnHeartbeat(); return true; |
| | | case kMsgTypeQueryTopic: OnQueryTopic(); return true; |
| | | default: return false; |
| | | } |
| | | }; |
| | |
| | | |
| | | const int kMaxWorker = 16; |
| | | return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); |
| | | } |
| | | } |