lichao
2021-03-31 4353f73ea0c30c776a3957dc674d750e51519ca3
utest/utest.cpp
@@ -66,20 +66,24 @@
BOOST_AUTO_TEST_CASE(PubSubTest)
{
   const std::string shm_name("ShmPubSub");
   ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
   // ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
   SharedMemory shm(shm_name, 1024 * 1024 * 50);
   DEFER1(shm.Remove());
   auto Avail = [&]() { return shm.get_free_memory(); };
   auto init_avail = Avail();
   int *flag = shm.find_or_construct<int>("flag")(123);
   printf("flag = %d\n", *flag);
   ++*flag;
   BusManager bus(shm);
   bus.Start(1);
   bus.Start();
   std::this_thread::sleep_for(100ms);
   std::atomic<uint64_t> count(0);
   std::atomic<ptime> last_time(Now() - seconds(1));
   std::atomic<uint64_t> last_count(0);
   const uint64_t nmsg = 100;
   const uint64_t nmsg = 100 * 2;
   const int timeout = 1000;
   auto Sub = [&](int id, const std::vector<std::string> &topics) {
      ShmSocket client(ShmSocket::eSockSubscribe, shm);
@@ -87,7 +91,7 @@
      std::mutex mutex;
      std::condition_variable cv;
      int i = 0;
      uint64_t i = 0;
      auto OnRecv = [&](BHMsg &msg) {
         if (msg.type() != kMsgTypePublish) {
            BOOST_CHECK(false);
@@ -117,7 +121,7 @@
   auto Pub = [&](const std::string &topic) {
      ShmSocket provider(ShmSocket::eSockPublish, shm);
      for (int i = 0; i < nmsg; ++i) {
      for (unsigned i = 0; i < nmsg; ++i) {
         std::string data = topic + std::to_string(i) + std::string(1000, '-');
         bool r = provider.Publish(topic, data.data(), data.size(), timeout);
@@ -134,7 +138,7 @@
      topics.push_back("t" + std::to_string(i));
   }
   Topics part;
   for (int i = 0; i < topics.size(); ++i) {
   for (size_t i = 0; i < topics.size(); ++i) {
      part.push_back(topics[i]);
      threads.Launch(Sub, i, topics);
   }