From 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 19 五月 2021 19:14:13 +0800 Subject: [PATCH] add api BHQueryProcs. --- utest/robust_test.cpp | 262 +++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 197 insertions(+), 65 deletions(-) diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp index 9384c10..ea6144c 100644 --- a/utest/robust_test.cpp +++ b/utest/robust_test.cpp @@ -1,5 +1,6 @@ #include "robust.h" #include "util.h" +#include <boost/circular_buffer.hpp> using namespace robust; @@ -8,60 +9,91 @@ eLockerMask = MaskBits(sizeof(int) * 8), }; -typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb; -Rcb *GetRCB(SharedMemory &shm, const int nelem) -{ - int cap = nelem + 1; - typedef uint64_t Data; - auto size = sizeof(Rcb) + sizeof(Data) * cap; - void *p = shm.Alloc(size); - if (p) { - return new (p) Rcb(cap, shm.get_segment_manager()); - } - return nullptr; -} - void MySleep() { std::this_thread::sleep_for(2us); } +///////////////////////////////////////////////////////////////////////////////////////// + +BOOST_AUTO_TEST_CASE(InitTest) +{ + AtomicReqRep rr; + auto client = [&]() { + for (int i = 0; i < 20; ++i) { + int64_t reply = 0; + bool r = rr.ClientRequest(i, reply); + printf("init request %d, %s, reply %d\n", i, (r ? "ok" : "failed"), reply); + } + }; + + bool run = true; + auto server = [&]() { + auto onReq = [](int64_t req) { return req + 100; }; + while (run) { + rr.ServerProcess(onReq); + } + }; + + ThreadManager clients, servers; + servers.Launch(server); + for (int i = 0; i < 2; ++i) { + clients.Launch(client); + } + clients.WaitAll(); + run = false; + servers.WaitAll(); +} + BOOST_AUTO_TEST_CASE(QueueTest) { + const int nthread = 100; + const uint64_t nmsg = 1000 * 1000 * 10; + SharedMemory &shm = TestShm(); shm.Remove(); - pid_t pid = getpid(); - printf("pid : %d\n", pid); - auto Access = [](pid_t pid) { - char buf[100] = {0}; - sprintf(buf, "/proc/%d/stat", pid); - int r = access(buf, F_OK); - printf("access %d\n", r); - }; - Access(pid); - Access(pid + 1); - // Sleep(10s); - // return; + // return; ///////////////////////////////////////////////// + int64_t i64 = 0; + char c = 0; + for (int i = 0; i < 256; ++i) { + c = i; + i64 = int64_t(c) << 1; + BOOST_CHECK_EQUAL(c, (i64 >> 1)); + uint64_t u64 = i; + BOOST_CHECK_EQUAL((u64 & 255), i); + } - int nelement = 640; - auto rcb = GetRCB(shm, nelement); - BOOST_CHECK(rcb != nullptr); - BOOST_CHECK(rcb->empty()); - BOOST_CHECK(rcb->push_back(1)); - BOOST_CHECK(rcb->size() == 1); - int64_t d; - BOOST_CHECK(rcb->pop_front(d)); - BOOST_CHECK(rcb->empty()); - - const uint64_t nmsg = 1000 * 1000 * 1; uint64_t correct_total = nmsg * (nmsg - 1) / 2; std::atomic<uint64_t> total(0); std::atomic<uint64_t> nwrite(0); std::atomic<uint64_t> writedone(0); + +#if 1 + const int kPower = 0; + typedef AtomicQueue<kPower> Rcb; + + Rcb tmp; + // BOOST_CHECK(tmp.like_empty()); + BOOST_CHECK(tmp.push(1)); + if (kPower != 0) { + BOOST_CHECK(tmp.tail() == 1); + } + BOOST_CHECK(tmp.head() == 0); + int64_t d; + BOOST_CHECK(tmp.pop(d)); + if (kPower != 0) { + // BOOST_CHECK(tmp.like_empty()); + BOOST_CHECK(tmp.head() == 1); + BOOST_CHECK(tmp.tail() == 1); + } + + ShmObject<Rcb> rcb(shm, "test_rcb"); + bool try_more = true; + auto Writer = [&]() { uint64_t n = 0; while ((n = nwrite++) < nmsg) { - while (!rcb->push_back(n)) { + while (!rcb->push(n, try_more)) { // MySleep(); } ++writedone; @@ -71,14 +103,66 @@ auto Reader = [&]() { while (nread.load() < nmsg) { int64_t d; - if (rcb->pop_front(d)) { + if (rcb->pop(d, try_more)) { ++nread; total += d; } else { - MySleep(); + // MySleep(); } } }; + +#else + typedef Circular<int64_t> Rcb; + ShmObject<Rcb> rcb(shm, "test_rcb", 16, shm.get_segment_manager()); + + typedef FMutex Mutex; + // typedef SemMutex Mutex; + Mutex mtx(123); + auto Writer = [&]() { + uint64_t n = 0; + while ((n = nwrite++) < nmsg) { + auto Write = [&]() { + robust::Guard<Mutex> lk(mtx); + if (rcb->full()) { + return false; + } else { + rcb->push_back(n); + return true; + } + // return rcb->push_back(n); + }; + while (!Write()) { + // MySleep(); + } + ++writedone; + } + }; + std::atomic<uint64_t> nread(0); + auto Reader = [&]() { + while (nread.load() < nmsg) { + int64_t d; + auto Read = [&]() { + robust::Guard<Mutex> lk(mtx); + if (rcb->empty()) { + return false; + } else { + d = rcb->front(); + rcb->pop_front(); + return true; + } + // return rcb->pop_front(d); + }; + if (Read()) { + ++nread; + total += d; + } else { + // MySleep(); + } + } + }; + +#endif auto status = [&]() { auto next = steady_clock::now(); @@ -89,28 +173,52 @@ next += 1s; auto w = writedone.load(); auto r = nread.load(); - printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld , queue size: %d\n", w, w - lw, r, r - lr, rcb->size()); + printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld\n", + w, w - lw, r, r - lr); lw = w; lr = r; } while (nread.load() < nmsg); }; - ThreadManager threads; - boost::timer::auto_cpu_timer timer; - printf("Testing Robust Buffer, msgs %ld, queue size: %d \n", nmsg, nelement); - threads.Launch(status); - for (int i = 0; i < 10; ++i) { - threads.Launch(Reader); - threads.Launch(Writer); + std::thread st(status); + { + ThreadManager threads; + boost::timer::auto_cpu_timer timer; + // printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread); + printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, 16, nthread); + for (int i = 0; i < nthread; ++i) { + threads.Launch(Reader); + threads.Launch(Writer); + } + threads.WaitAll(); } - threads.WaitAll(); + st.join(); printf("total: %ld, expected: %ld\n", total.load(), correct_total); BOOST_CHECK_EQUAL(total.load(), correct_total); } BOOST_AUTO_TEST_CASE(MutexTest) { - typedef robust::Mutex RobustMutex; + { + int sem_id = semget(100, 1, 0666 | IPC_CREAT); + auto P = [&]() { + sembuf op = {0, -1, SEM_UNDO}; + semop(sem_id, &op, 1); + }; + auto V = [&]() { + sembuf op = {0, 1, SEM_UNDO}; + semop(sem_id, &op, 1); + }; + for (int i = 0; i < 10; ++i) { + V(); + } + Sleep(10s); + + return; + } + + // typedef robust::MFMutex RobustMutex; + typedef robust::SemMutex RobustMutex; for (int i = 0; i < 20; ++i) { int size = i; @@ -125,38 +233,61 @@ const std::string mtx_name("test_mutex"); const std::string int_name("test_int"); - auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name); + // auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name, 12345); + RobustMutex rmtx(12345); + auto mtx = &rmtx; auto pi = shm.FindOrCreate<int>(int_name, 100); std::mutex m; typedef std::chrono::steady_clock Clock; - auto Now = []() { return Clock::now().time_since_epoch(); }; + if (pi) { auto old = *pi; printf("int : %d, add1: %d\n", old, ++*pi); } - { - boost::timer::auto_cpu_timer timer; - const int ntimes = 1000 * 1000; - printf("test lock/unlock %d times: ", ntimes); - RobustMutex mutex; + auto LockSpeed = [](auto &mutex, const std::string &name) { + const int ntimes = 1000 * 1; auto Lock = [&]() { for (int i = 0; i < ntimes; ++i) { mutex.lock(); mutex.unlock(); } }; - std::thread t1(Lock), t2(Lock); - t1.join(); - t2.join(); - } - - auto MSFromNow = [](const int ms) { - using namespace boost::posix_time; - ptime cur = boost::posix_time::microsec_clock::universal_time(); - return cur + millisec(ms); + printf("\nTesting %s lock/unlock %d times\n", name.c_str(), ntimes); + { + boost::timer::auto_cpu_timer timer; + printf("1 thread: "); + Lock(); + } + auto InThread = [&](int nthread) { + boost::timer::auto_cpu_timer timer; + printf("%d threads: ", nthread); + std::vector<std::thread> vt; + for (int i = 0; i < nthread; ++i) { + vt.emplace_back(Lock); + } + for (auto &t : vt) { + t.join(); + } + }; + InThread(4); + InThread(16); + InThread(100); + InThread(1000); }; + NullMutex null_mtx; + std::mutex std_mtx; + CasMutex cas_mtx; + FMutex mfmtx(3); + boost::interprocess::interprocess_mutex ipc_mutex; + SemMutex sem_mtx(3); + LockSpeed(null_mtx, "null mutex"); + LockSpeed(std_mtx, "std::mutex"); + // LockSpeed(cas_mtx, "CAS mutex"); + LockSpeed(ipc_mutex, "boost ipc mutex"); + LockSpeed(mfmtx, "mutex+flock"); + LockSpeed(sem_mtx, "sem mutex"); auto TryLock = [&]() { if (mtx->try_lock()) { @@ -175,6 +306,7 @@ if (mtx) { printf("mtx exists\n"); if (TryLock()) { + // Sleep(10s); auto op = [&]() { if (TryLock()) { Unlock(); -- Gitblit v1.8.0