#include "robust.h" #include "util.h" using namespace robust; enum { eLockerBits = 32, eLockerMask = MaskBits(sizeof(int) * 8), }; void MySleep() { std::this_thread::sleep_for(2us); } ///////////////////////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(QueueTest) { const int nthread = 100; const uint64_t nmsg = 1000 * 1000 * 10; SharedMemory &shm = TestShm(); shm.Remove(); // return; ///////////////////////////////////////////////// int64_t i64 = 0; char c = 0; for (int i = 0; i < 256; ++i) { c = i; i64 = int64_t(c) << 1; BOOST_CHECK_EQUAL(c, (i64 >> 1)); uint64_t u64 = i; BOOST_CHECK_EQUAL((u64 & 255), i); } #if 1 typedef AtomicQueue<3> Rcb; Rcb tmp; BOOST_CHECK(tmp.like_empty()); BOOST_CHECK(tmp.push_back(1)); BOOST_CHECK(tmp.tail() == 1); BOOST_CHECK(tmp.head() == 0); int64_t d; BOOST_CHECK(tmp.pop_front(d)); BOOST_CHECK(tmp.like_empty()); BOOST_CHECK(tmp.head() == 1); BOOST_CHECK(tmp.tail() == 1); ShmObject rcb(shm, "test_rcb"); #else typedef Circular Rcb; ShmObject rcb(shm, "test_rcb", 64, shm.get_segment_manager()); #endif const int nsize = sizeof(Rcb); bool try_more = false; uint64_t correct_total = nmsg * (nmsg - 1) / 2; std::atomic total(0); std::atomic nwrite(0); std::atomic writedone(0); auto Writer = [&]() { uint64_t n = 0; while ((n = nwrite++) < nmsg) { while (!rcb->push_back(n, try_more)) { // MySleep(); } ++writedone; } }; std::atomic nread(0); auto Reader = [&]() { while (nread.load() < nmsg) { int64_t d; if (rcb->pop_front(d, try_more)) { ++nread; total += d; } else { // MySleep(); } } }; auto status = [&]() { auto next = steady_clock::now(); uint32_t lw = 0; uint32_t lr = 0; do { std::this_thread::sleep_until(next); next += 1s; auto w = writedone.load(); auto r = nread.load(); printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld\n", w, w - lw, r, r - lr); lw = w; lr = r; } while (nread.load() < nmsg); }; std::thread st(status); { ThreadManager threads; boost::timer::auto_cpu_timer timer; printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread); for (int i = 0; i < nthread; ++i) { threads.Launch(Reader); threads.Launch(Writer); } threads.WaitAll(); } st.join(); printf("total: %ld, expected: %ld\n", total.load(), correct_total); BOOST_CHECK_EQUAL(total.load(), correct_total); } BOOST_AUTO_TEST_CASE(MutexTest) { typedef robust::Mutex RobustMutex; for (int i = 0; i < 20; ++i) { int size = i; int left = size & 7; int rsize = size + ((8 - left) & 7); printf("size: %3d, rsize: %3d\n", size, rsize); } SharedMemory &shm = TestShm(); // shm.Remove(); // return; GlobalInit(shm); const std::string mtx_name("test_mutex"); const std::string int_name("test_int"); auto mtx = shm.FindOrCreate(mtx_name); auto pi = shm.FindOrCreate(int_name, 100); std::mutex m; typedef std::chrono::steady_clock Clock; auto Now = []() { return Clock::now().time_since_epoch(); }; if (pi) { auto old = *pi; printf("int : %d, add1: %d\n", old, ++*pi); } { const int ntimes = 1000 * 1000; RobustMutex mutex; auto Lock = [&]() { for (int i = 0; i < ntimes; ++i) { mutex.lock(); mutex.unlock(); } }; { boost::timer::auto_cpu_timer timer; printf("test lock/unlock %d times: ", ntimes); Lock(); } { boost::timer::auto_cpu_timer timer; printf("test lock/unlock %d times, 2 thread: ", ntimes); std::thread t1(Lock), t2(Lock); t1.join(); t2.join(); } } auto TryLock = [&]() { if (mtx->try_lock()) { printf("try_lock ok\n"); return true; } else { printf("try_lock failed\n"); return false; } }; auto Unlock = [&]() { mtx->unlock(); printf("unlocked\n"); }; if (mtx) { printf("mtx exists\n"); if (TryLock()) { auto op = [&]() { if (TryLock()) { Unlock(); } }; op(); std::thread t(op); t.join(); // Unlock(); } else { // mtx->unlock(); } } else { printf("mtx not exists\n"); } }