#include #include #include #include #include #include #include #include "pubsub.h" #include "defs.h" #include "util.h" template struct IsSameType { static const bool value = false; }; template struct IsSameType { static const bool value = true; }; BOOST_AUTO_TEST_CASE(ShmTest) { const int size = 1024*1024; const std::string shm_name("ShmRemoveTest"); SharedMemory m0(shm_name, size); int *p = m0.find_or_construct("abc")(100); if (p) { printf("*p = %d\n", *p); ++*p; } { SharedMemory m1(shm_name, size); m1.Remove(); // m1.Remove(); } if (p) { printf("again *p = %d\n", *p); // ++*p; } } BOOST_AUTO_TEST_CASE(PubSubTest) { const std::string shm_name("ShmPubSub"); ShmRemover auto_remove(shm_name); //remove twice? in case of killed? SharedMemory shm(shm_name, 1024*1024*50); auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); BusManager bus(shm); bus.Start(1); std::this_thread::sleep_for(100ms); std::atomic count(0); std::atomic last_time(Now() - seconds(1)); std::atomic last_count(0); const uint64_t nmsg = 1000 * 100; const int timeout = 1000; auto Sub = [&](int id, const std::vector &topics) { ShmMsgQueue client(shm, 8); client.Send(kBHBusQueueId, MakeSub(client.Id(), topics), timeout); for (int i = 0; i < nmsg * topics.size(); ++i) { BHMsg msg; if (client.Recv(msg, 1000)) { if (msg.type() != kMsgTypePublish) { BOOST_CHECK(false); } DataPub pub; if (!pub.ParseFromString(msg.body())) { BOOST_CHECK(false); } ++count; auto cur = Now(); if (last_time.exchange(cur) < cur) { std::cout << "time: " << cur; printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", count.load(), count - last_count.exchange(count), init_avail - Avail()); } // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); } else { printf("sub %2d recv timeout\n", id); } } }; auto Pub = [&](const std::string &topic) { ShmMsgQueue provider(shm, 0); for (int i = 0; i < nmsg; ++i) { std::string data = topic + std::to_string(i); bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout); if (!r) { printf("pub ret: %s\n", r ? "ok" : "fail"); } } }; ThreadManager threads; typedef std::vector Topics; Topics topics; topics.push_back("000-xxx"); topics.push_back("111-football"); topics.push_back("222-sport"); // topics.push_back("333-sport"); // topics.push_back("444-sport"); Topics part; for (int i = 0; i < topics.size(); ++i) { part.push_back(topics[i]); threads.Launch(Sub, i, part); } std::this_thread::sleep_for(100ms); for (auto &topic: topics) { threads.Launch(Pub, topic); } threads.Launch(Pub, "some_else"); threads.WaitAll(); bus.Stop(); } inline int MyMin(int a, int b) { printf("MyMin\n"); return a < b ? a : b; } int test_main(int argc, char *argv[]) { printf("test main\n"); int a = 0; int b = 0; BOOST_CHECK_EQUAL(a, b); int n = MyMin(4,6); for (int i = 0; i < n; ++i) { printf("i = %d\n", i); } return 0; }