#include "robust.h" #include "util.h" #include using namespace robust; enum { eLockerBits = 32, eLockerMask = MaskBits(sizeof(int) * 8), }; void MySleep() { std::this_thread::sleep_for(2us); } ///////////////////////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(InitTest) { AtomicReqRep rr; auto client = [&]() { for (int i = 0; i < 5; ++i) { int64_t reply = 0; bool r = rr.ClientRequest(i, reply); printf("init request %d, %s, reply %d\n", i, (r ? "ok" : "failed"), reply); } }; bool run = true; auto server = [&]() { auto onReq = [](int64_t req) { return req + 100; }; while (run) { rr.ServerProcess(onReq); } }; ThreadManager clients, servers; servers.Launch(server); for (int i = 0; i < 2; ++i) { clients.Launch(client); } clients.WaitAll(); run = false; servers.WaitAll(); } BOOST_AUTO_TEST_CASE(QueueTest) { const int nthread = 100; const uint64_t nmsg = 1000 * 1000 * 10; SharedMemory &shm = TestShm(); shm.Remove(); // return; ///////////////////////////////////////////////// int64_t i64 = 0; char c = 0; for (int i = 0; i < 256; ++i) { c = i; i64 = int64_t(c) << 1; BOOST_CHECK_EQUAL(c, (i64 >> 1)); uint64_t u64 = i; BOOST_CHECK_EQUAL((u64 & 255), i); } uint64_t correct_total = nmsg * (nmsg - 1) / 2; std::atomic total(0); std::atomic nwrite(0); std::atomic writedone(0); typedef AtomicQ63 Rcb; Rcb tmp; BOOST_CHECK(tmp.push(1)); int64_t d; BOOST_CHECK(tmp.pop(d)); NamedShmObject rcb(shm, "test_rcb", eOpenOrCreate); bool try_more = true; auto Writer = [&]() { uint64_t n = 0; while ((n = nwrite++) < nmsg) { while (!rcb->push(n, try_more)) { // MySleep(); } ++writedone; } }; std::atomic nread(0); auto Reader = [&]() { while (nread.load() < nmsg) { int64_t d; if (rcb->pop(d, try_more)) { ++nread; total += d; } else { // MySleep(); } } }; auto status = [&]() { auto next = steady_clock::now(); uint32_t lw = 0; uint32_t lr = 0; do { std::this_thread::sleep_until(next); next += 1s; auto w = writedone.load(); auto r = nread.load(); printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld\n", w, w - lw, r, r - lr); lw = w; lr = r; } while (nread.load() < nmsg); }; std::thread st(status); { ThreadManager threads; boost::timer::auto_cpu_timer timer; // printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread); printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, 16, nthread); for (int i = 0; i < nthread; ++i) { threads.Launch(Reader); threads.Launch(Writer); } threads.WaitAll(); } st.join(); printf("total: %ld, expected: %ld\n", total.load(), correct_total); BOOST_CHECK_EQUAL(total.load(), correct_total); }