| | |
| | | std::unordered_map<Topic, WeakNode> topic_map_; |
| | | std::unordered_map<ProcId, Node> nodes_; |
| | | }; |
| | | |
| | | Synced<NodeCenter> &Center() |
| | | { |
| | | static Synced<NodeCenter> s; |
| | | return s; |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | bool ReqRepCenter::Start(const int nworker) |
| | | BHCenter::MsgHandler MakeReqRepCenter() |
| | | { |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(); |
| | | auto onRecv = [center_ptr, this](BHMsg &msg) { |
| | | return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { |
| | | auto ¢er = *center_ptr; |
| | | auto &shm = socket.shm(); |
| | | |
| | | #ifndef NDEBUG |
| | | static std::atomic<time_t> last(0); |
| | | time_t now = 0; |
| | | time(&now); |
| | | if (last.exchange(now) < now) { |
| | | printf("bus queue size: %ld\n", socket_.Pending()); |
| | | printf("bus queue size: %ld\n", socket.Pending()); |
| | | } |
| | | #endif |
| | | if (msg.route_size() == 0) { |
| | | return; |
| | | } |
| | | auto &src_mq = msg.route(0).mq_id(); |
| | | auto SrcMQ = [&]() { return msg.route(0).mq_id(); }; |
| | | |
| | | auto OnRegister = [&]() { |
| | | if (msg.route_size() != 1) { return; } |
| | | |
| | | DataProcRegister reg; |
| | | if (reg.ParseFromString(msg.body()) && reg.has_proc()) { |
| | | center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end()); |
| | | center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end()); |
| | | } |
| | | }; |
| | | |
| | | auto OnHeartbeat = [&]() { |
| | | if (msg.route_size() != 1) { return; } |
| | | auto &src_mq = msg.route(0).mq_id(); |
| | | |
| | | DataProcHeartbeat hb; |
| | | if (hb.ParseFromString(msg.body()) && hb.has_proc()) { |
| | | center->Heartbeat(*hb.mutable_proc(), src_mq); |
| | | center->Heartbeat(*hb.mutable_proc(), SrcMQ()); |
| | | } |
| | | }; |
| | | |
| | | auto OnQueryTopic = [&]() { |
| | | if (msg.route_size() != 1) { return; } |
| | | |
| | | DataProcQueryTopic query; |
| | | NodeCenter::ProcAddr dest; |
| | | if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { |
| | | MQId remote; |
| | | memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote)); |
| | | memcpy(&remote, SrcMQ().data(), sizeof(MQId)); |
| | | MsgI imsg; |
| | | if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; } |
| | | if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) { |
| | | imsg.Release(shm()); |
| | | if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; } |
| | | if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) { |
| | | imsg.Release(shm); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | switch (msg.type()) { |
| | | case kMsgTypeProcRegisterTopics: OnRegister(); break; |
| | | case kMsgTypeProcHeartbeat: OnHeartbeat(); break; |
| | | case kMsgTypeProcQueryTopic: OnQueryTopic(); break; |
| | | default: break; |
| | | case kMsgTypeProcRegisterTopics: OnRegister(); return true; |
| | | case kMsgTypeProcHeartbeat: OnHeartbeat(); return true; |
| | | case kMsgTypeProcQueryTopic: OnQueryTopic(); return true; |
| | | default: return false; |
| | | } |
| | | }; |
| | | } |
| | | |
| | | bool ReqRepCenter::Start(const int nworker) |
| | | { |
| | | auto handler = MakeReqRepCenter(); |
| | | printf("sizeof(rep/req handler) = %ld\n", sizeof(handler)); |
| | | |
| | | const int kMaxWorker = 16; |
| | | return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); |
| | | return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); |
| | | } |