lichao
2021-04-02 83085f2ce99cca05d40a19482151873a55e6393a
src/reqrep_center.cpp
@@ -99,63 +99,81 @@
   std::unordered_map<Topic, WeakNode> topic_map_;
   std::unordered_map<ProcId, Node> nodes_;
};
Synced<NodeCenter> &Center()
{
   static Synced<NodeCenter> s;
   return s;
}
} // namespace
bool ReqRepCenter::Start(const int nworker)
BHCenter::MsgHandler MakeReqRepCenter()
{
   auto center_ptr = std::make_shared<Synced<NodeCenter>>();
   auto onRecv = [center_ptr, this](BHMsg &msg) {
   return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
      auto &center = *center_ptr;
      auto &shm = socket.shm();
#ifndef NDEBUG
      static std::atomic<time_t> last(0);
      time_t now = 0;
      time(&now);
      if (last.exchange(now) < now) {
         printf("bus queue size: %ld\n", socket_.Pending());
         printf("bus queue size: %ld\n", socket.Pending());
      }
#endif
      if (msg.route_size() == 0) {
         return;
      }
      auto &src_mq = msg.route(0).mq_id();
      auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
      auto OnRegister = [&]() {
         if (msg.route_size() != 1) { return; }
         DataProcRegister reg;
         if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
            center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end());
            center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
         }
      };
      auto OnHeartbeat = [&]() {
         if (msg.route_size() != 1) { return; }
         auto &src_mq = msg.route(0).mq_id();
         DataProcHeartbeat hb;
         if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
            center->Heartbeat(*hb.mutable_proc(), src_mq);
            center->Heartbeat(*hb.mutable_proc(), SrcMQ());
         }
      };
      auto OnQueryTopic = [&]() {
         if (msg.route_size() != 1) { return; }
         DataProcQueryTopic query;
         NodeCenter::ProcAddr dest;
         if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
            MQId remote;
            memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
            memcpy(&remote, SrcMQ().data(), sizeof(MQId));
            MsgI imsg;
            if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
            if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) {
               imsg.Release(shm());
            if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
            if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) {
               imsg.Release(shm);
            }
         }
      };
      switch (msg.type()) {
      case kMsgTypeProcRegisterTopics: OnRegister(); break;
      case kMsgTypeProcHeartbeat: OnHeartbeat(); break;
      case kMsgTypeProcQueryTopic: OnQueryTopic(); break;
      default: break;
      case kMsgTypeProcRegisterTopics: OnRegister(); return true;
      case kMsgTypeProcHeartbeat: OnHeartbeat(); return true;
      case kMsgTypeProcQueryTopic: OnQueryTopic(); return true;
      default: return false;
      }
   };
}
bool ReqRepCenter::Start(const int nworker)
{
   auto handler = MakeReqRepCenter();
   printf("sizeof(rep/req handler) = %ld\n", sizeof(handler));
   const int kMaxWorker = 16;
   return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
   return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}