#include "robust.h" #include "util.h" #include using namespace robust; enum { eLockerBits = 32, eLockerMask = MaskBits(sizeof(int) * 8), }; void MySleep() { std::this_thread::sleep_for(2us); } ///////////////////////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(QueueTest) { const int nthread = 100; const uint64_t nmsg = 1000 * 1000 * 10; SharedMemory &shm = TestShm(); shm.Remove(); // return; ///////////////////////////////////////////////// int64_t i64 = 0; char c = 0; for (int i = 0; i < 256; ++i) { c = i; i64 = int64_t(c) << 1; BOOST_CHECK_EQUAL(c, (i64 >> 1)); uint64_t u64 = i; BOOST_CHECK_EQUAL((u64 & 255), i); } uint64_t correct_total = nmsg * (nmsg - 1) / 2; std::atomic total(0); std::atomic nwrite(0); std::atomic writedone(0); #if 0 typedef AtomicQueue<4> Rcb; Rcb tmp; BOOST_CHECK(tmp.like_empty()); BOOST_CHECK(tmp.push(1)); BOOST_CHECK(tmp.tail() == 1); BOOST_CHECK(tmp.head() == 0); int64_t d; BOOST_CHECK(tmp.pop(d)); BOOST_CHECK(tmp.like_empty()); BOOST_CHECK(tmp.head() == 1); BOOST_CHECK(tmp.tail() == 1); ShmObject rcb(shm, "test_rcb"); bool try_more = true; auto Writer = [&]() { uint64_t n = 0; while ((n = nwrite++) < nmsg) { while (!rcb->push(n, try_more)) { // MySleep(); } ++writedone; } }; std::atomic nread(0); auto Reader = [&]() { while (nread.load() < nmsg) { int64_t d; if (rcb->pop(d, try_more)) { ++nread; total += d; } else { // MySleep(); } } }; #else typedef Circular Rcb; ShmObject rcb(shm, "test_rcb", 16, shm.get_segment_manager()); typedef FMutex Mutex; // typedef SemMutex Mutex; Mutex mtx(123); auto Writer = [&]() { uint64_t n = 0; while ((n = nwrite++) < nmsg) { auto Write = [&]() { robust::Guard lk(mtx); if (rcb->full()) { return false; } else { rcb->push_back(n); return true; } // return rcb->push_back(n); }; while (!Write()) { // MySleep(); } ++writedone; } }; std::atomic nread(0); auto Reader = [&]() { while (nread.load() < nmsg) { int64_t d; auto Read = [&]() { robust::Guard lk(mtx); if (rcb->empty()) { return false; } else { d = rcb->front(); rcb->pop_front(); return true; } // return rcb->pop_front(d); }; if (Read()) { ++nread; total += d; } else { // MySleep(); } } }; #endif auto status = [&]() { auto next = steady_clock::now(); uint32_t lw = 0; uint32_t lr = 0; do { std::this_thread::sleep_until(next); next += 1s; auto w = writedone.load(); auto r = nread.load(); printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld\n", w, w - lw, r, r - lr); lw = w; lr = r; } while (nread.load() < nmsg); }; std::thread st(status); { ThreadManager threads; boost::timer::auto_cpu_timer timer; // printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread); printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, 16, nthread); for (int i = 0; i < nthread; ++i) { threads.Launch(Reader); threads.Launch(Writer); } threads.WaitAll(); } st.join(); printf("total: %ld, expected: %ld\n", total.load(), correct_total); BOOST_CHECK_EQUAL(total.load(), correct_total); } BOOST_AUTO_TEST_CASE(MutexTest) { { int fd = open("/tmp/test_fmutex", O_CREAT | O_RDONLY, 0666); flock(fd, LOCK_EX); printf("lock 1"); Sleep(10s); flock(fd, LOCK_EX); printf("lock 2"); Sleep(10s); flock(fd, LOCK_UN); printf("un lock 2"); Sleep(10s); flock(fd, LOCK_UN); printf("un lock 1"); return; } // typedef robust::MFMutex RobustMutex; typedef robust::SemMutex RobustMutex; for (int i = 0; i < 20; ++i) { int size = i; int left = size & 7; int rsize = size + ((8 - left) & 7); printf("size: %3d, rsize: %3d\n", size, rsize); } SharedMemory &shm = TestShm(); // shm.Remove(); // return; GlobalInit(shm); const std::string mtx_name("test_mutex"); const std::string int_name("test_int"); // auto mtx = shm.FindOrCreate(mtx_name, 12345); RobustMutex rmtx(12345); auto mtx = &rmtx; auto pi = shm.FindOrCreate(int_name, 100); std::mutex m; typedef std::chrono::steady_clock Clock; auto Now = []() { return Clock::now().time_since_epoch(); }; if (pi) { auto old = *pi; printf("int : %d, add1: %d\n", old, ++*pi); } auto LockSpeed = [](auto &mutex, const std::string &name) { const int ntimes = 1000 * 1; auto Lock = [&]() { for (int i = 0; i < ntimes; ++i) { mutex.lock(); mutex.unlock(); } }; printf("\nTesting %s lock/unlock %d times\n", name.c_str(), ntimes); { boost::timer::auto_cpu_timer timer; printf("1 thread: "); Lock(); } auto InThread = [&](int nthread) { boost::timer::auto_cpu_timer timer; printf("%d threads: ", nthread); std::vector vt; for (int i = 0; i < nthread; ++i) { vt.emplace_back(Lock); } for (auto &t : vt) { t.join(); } }; InThread(4); InThread(16); InThread(100); InThread(1000); }; NullMutex null_mtx; std::mutex std_mtx; CasMutex cas_mtx; FMutex mfmtx(3); boost::interprocess::interprocess_mutex ipc_mutex; SemMutex sem_mtx(3); LockSpeed(null_mtx, "null mutex"); LockSpeed(std_mtx, "std::mutex"); // LockSpeed(cas_mtx, "CAS mutex"); LockSpeed(ipc_mutex, "boost ipc mutex"); LockSpeed(mfmtx, "mutex+flock"); LockSpeed(sem_mtx, "sem mutex"); auto TryLock = [&]() { if (mtx->try_lock()) { printf("try_lock ok\n"); return true; } else { printf("try_lock failed\n"); return false; } }; auto Unlock = [&]() { mtx->unlock(); printf("unlocked\n"); }; if (mtx) { printf("mtx exists\n"); if (TryLock()) { // Sleep(10s); auto op = [&]() { if (TryLock()) { Unlock(); } }; op(); std::thread t(op); t.join(); // Unlock(); } else { // mtx->unlock(); } } else { printf("mtx not exists\n"); } }