lichao
2021-05-19 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3
utest/speed_test.cpp
@@ -24,16 +24,8 @@
{
   SharedMemory &shm = TestShm();
   GlobalInit(shm);
   auto InitSem = [](auto id) {
      auto sem_id = semget(id, 1, 0666 | IPC_CREAT);
      union semun init_val;
      init_val.val = 1;
      semctl(sem_id, 0, SETVAL, init_val);
      return;
   };
   MQId id = ShmMsgQueue::NewId();
   InitSem(id);
   MQId server_id = ShmMsgQueue::NewId();
   ShmMsgQueue server(server_id, shm, 1000);
   const int timeout = 1000;
   const uint32_t data_size = 1001;
@@ -44,7 +36,6 @@
   std::string str(data_size, 'a');
   auto Writer = [&](int writer_id, uint64_t n) {
      MQId cli_id = ShmMsgQueue::NewId();
      InitSem(cli_id);
      ShmMsgQueue mq(cli_id, shm, 64);
      MsgI msg;
@@ -57,12 +48,13 @@
      DEFER1(msg.Release(););
      for (uint64_t i = 0; i < n; ++i) {
         while (!mq.TrySend(id, msg)) {}
         msg.AddRef();
         while (!mq.TrySend({server.Id(), server.AbsAddr()}, msg.Offset())) {}
         ++nwrite;
      }
   };
   auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
      ShmMsgQueue mq(id, shm, 1000);
      ShmMsgQueue &mq = server;
      auto now = []() { return steady_clock::now(); };
      auto tm = now();
      while (*run) {
@@ -157,8 +149,8 @@
   auto Avail = [&]() { return shm.get_free_memory(); };
   auto init_avail = Avail();
   ShmSocket srv(shm, qlen);
   ShmSocket cli(shm, qlen);
   ShmSocket srv(shm, ShmMsgQueue::NewId(), qlen);
   ShmSocket cli(shm, ShmMsgQueue::NewId(), qlen);
   int ncli = 1;
   uint64_t nmsg = 1000 * 1000 * 1;
@@ -188,43 +180,33 @@
            req_body.set_topic("topic");
            req_body.set_data(msg_content);
            auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id()));
            req_head.add_route()->set_mq_id(cli.id());
            return cli.Send(srv.id(), req_head, req_body);
            auto route = req_head.add_route();
            route->set_mq_id(cli.id());
            route->set_abs_addr(cli.AbsAddr());
            return cli.Send({srv.id(), srv.AbsAddr()}, req_head, req_body);
         };
         Req();
      }
   };
   auto onRequest = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
      if (head.type() == kMsgTypeRequestTopic) {
         MQInfo src_mq = {head.route()[0].mq_id(), head.route()[0].abs_addr()};
   std::atomic<bool> stop(false);
   auto Server = [&]() {
      MsgI req;
      BHMsgHead req_head;
      while (!stop) {
         if (srv.SyncRecv(req, req_head, 10)) {
            DEFER1(req.Release());
            if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
               auto src_id = req_head.route()[0].mq_id();
               auto Reply = [&]() {
                  MsgRequestTopic reply_body;
                  reply_body.set_topic("topic");
                  reply_body.set_data(msg_content);
                  auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
                  return srv.Send(src_id, reply_head, reply_body);
               };
               Reply();
            }
         }
         MsgRequestTopic reply_body;
         reply_body.set_topic("topic");
         reply_body.set_data(msg_content);
         auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), head.msg_id()));
         srv.Send(src_mq, reply_head, reply_body);
      }
   };
   srv.Start(onRequest);
   boost::timer::auto_cpu_timer timer;
   DEFER1(printf("Request Reply Test:"););
   ThreadManager clients, servers;
   for (int i = 0; i < 2; ++i) { servers.Launch(Server); }
   ThreadManager clients;
   printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
   for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
   clients.WaitAll();
@@ -233,7 +215,6 @@
      std::this_thread::sleep_for(100ms);
   } while (count.load() < ncli * nmsg);
   PrintStatus(NowSec());
   stop = true;
   servers.WaitAll();
   srv.Stop();
   // BOOST_CHECK_THROW(reply.Count(), int);
}