#include "robust.h" #include "util.h" using namespace robust; typedef CircularBuffer> Rcb; Rcb *GetRCBImpl(SharedMemory &shm, const int nelem) { int cap = nelem + 1; typedef uint64_t Data; auto size = sizeof(Rcb) + sizeof(Data) * cap; void *p = shm.Alloc(size); if (p) { return new (p) Rcb(cap, shm.get_segment_manager()); } return nullptr; } Rcb *GetRCB(SharedMemory &shm, const int nelem) { void **pStore = shm.FindOrCreate("test_rcb_pointer", nullptr); if (pStore) { if (!*pStore) { *pStore = GetRCBImpl(shm, nelem); } return (Rcb *) *pStore; } return nullptr; } void MySleep() { std::this_thread::sleep_for(2us); } BOOST_AUTO_TEST_CASE(RobustTest) { SharedMemory &shm = TestShm(); shm.Remove(); pid_t pid = getpid(); printf("pid : %d\n", pid); auto Access = [](pid_t pid) { char buf[100] = {0}; sprintf(buf, "/proc/%d/stat", pid); int r = access(buf, F_OK); printf("access %d\n", r); }; Access(pid); Access(pid + 1); // Sleep(10s); // return; int nelement = 640; auto rcb = GetRCB(shm, nelement); BOOST_CHECK(rcb != nullptr); BOOST_CHECK(rcb->empty()); BOOST_CHECK(rcb->push_back(1)); BOOST_CHECK(rcb->size() == 1); int64_t d; BOOST_CHECK(rcb->pop_front(d)); BOOST_CHECK(rcb->empty()); const uint64_t nmsg = 1000 * 1000 * 1; uint64_t correct_total = nmsg * (nmsg - 1) / 2; std::atomic total(0); std::atomic nwrite(0); std::atomic writedone(0); auto Writer = [&]() { uint64_t n = 0; while ((n = nwrite++) < nmsg) { while (!rcb->push_back(n)) { // MySleep(); } ++writedone; } }; std::atomic nread(0); auto Reader = [&]() { while (nread.load() < nmsg) { int64_t d; if (rcb->pop_front(d)) { ++nread; total += d; } else { MySleep(); } } }; auto status = [&]() { auto next = steady_clock::now(); uint32_t lw = 0; uint32_t lr = 0; do { std::this_thread::sleep_until(next); next += 1s; auto w = writedone.load(); auto r = nread.load(); printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld , queue size: %d\n", w, w - lw, r, r - lr, rcb->size()); lw = w; lr = r; } while (nread.load() < nmsg); }; ThreadManager threads; boost::timer::auto_cpu_timer timer; printf("Testing Robust Buffer, msgs %ld, queue size: %d \n", nmsg, nelement); threads.Launch(status); for (int i = 0; i < 10; ++i) { threads.Launch(Reader); threads.Launch(Writer); } threads.WaitAll(); printf("total: %ld, expected: %ld\n", total.load(), correct_total); BOOST_CHECK_EQUAL(total.load(), correct_total); }