From 72bffb0807925a156b076b71f78c848a08d27b87 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 29 四月 2021 10:55:35 +0800 Subject: [PATCH] refactor mutex. --- src/robust.h | 73 +++++++++----- src/shm.h | 2 utest/api_test.cpp | 85 ----------------- utest/robust_test.cpp | 104 ++++++++++++++++++-- src/robust.cpp | 2 5 files changed, 140 insertions(+), 126 deletions(-) diff --git a/src/robust.cpp b/src/robust.cpp index 38d5d28..006ea5f 100644 --- a/src/robust.cpp +++ b/src/robust.cpp @@ -26,7 +26,7 @@ { static_assert(sizeof(steady_clock::duration) == sizeof(int64_t)); static_assert(sizeof(RobustReqRep) == 24); -static_assert(sizeof(CasMutex<false>) == 8); +static_assert(sizeof(Mutex) == 8); static_assert(sizeof(CircularBuffer<int>) == 48); auto Now() { return steady_clock::now().time_since_epoch(); } diff --git a/src/robust.h b/src/robust.h index 19e9bda..983567f 100644 --- a/src/robust.h +++ b/src/robust.h @@ -32,6 +32,7 @@ using namespace std::chrono; using namespace std::chrono_literals; +constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; } void QuickSleep(); @@ -102,20 +103,41 @@ char buf[4]; }; -template <bool isRobust = false> -class CasMutex +class PidLocker { - static pid_t pid() +public: + typedef int locker_t; + static locker_t this_locker() { - static pid_t val = getpid(); + static locker_t val = getpid(); return val; } - static bool Killed(pid_t pid) + static bool is_alive(locker_t locker) { return true; } +}; + +class RobustPidLocker +{ +public: + typedef int locker_t; + static locker_t this_locker() + { + static locker_t val = getpid(); + return val; + } + static bool is_alive(locker_t locker) { char buf[64] = {0}; - snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid); - return access(buf, F_OK) != 0; + snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker); + return access(buf, F_OK) == 0; } +}; + +template <class LockerT> +class CasMutex +{ + typedef typename LockerT::locker_t locker_t; + static inline locker_t this_locker() { return LockerT::this_locker(); } + static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); } public: CasMutex() : @@ -126,11 +148,11 @@ auto old = meta_.load(); int r = 0; if (!Locked(old)) { - r = MetaCas(old, Meta(1, pid())); - } else if (isRobust && Killed(Pid(old))) { - r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1; + r = MetaCas(old, Meta(1, this_locker())); + } else if (!is_alive(Locker(old))) { + r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1; if (r) { - printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r); + printf("captured pid %d -> %d, r = %d\n", Locker(old), this_locker(), r); } } return r; @@ -146,19 +168,21 @@ void unlock() { auto old = meta_.load(); - if (Locked(old) && Pid(old) == pid()) { - MetaCas(old, Meta(0, pid())); + if (Locked(old) && Locker(old) == this_locker()) { + MetaCas(old, Meta(0, this_locker())); } } private: + static_assert(sizeof(locker_t) < sizeof(uint64_t), "locker size must be smaller than 64 bit!"); std::atomic<uint64_t> meta_; bool Locked(uint64_t meta) { return (meta >> 63) != 0; } - pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); } - uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; } + locker_t Locker(uint64_t meta) { return meta & MaskBits(sizeof(locker_t) * 8); } + uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; } bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } - static_assert(sizeof(pid_t) < sizeof(uint64_t)); }; + +typedef CasMutex<RobustPidLocker> Mutex; template <class Lock> class Guard @@ -193,19 +217,17 @@ state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap)) { if (!buf) { - throw("error allocate buffer: out of mem!"); + throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!"); } } - ~CircularBuffer() - { - al_.deallocate(buf, capacity_); - } + ~CircularBuffer() { al_.deallocate(buf, capacity_); } + size_type size() const { return (capacity_ + tail() - head()) % capacity_; } bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; } bool empty() const { return head() == tail(); } bool push_back(Data d) { - Guard<MutexT> guard(mutex_); + Guard<Mutex> guard(mutex_); if (!full()) { auto old = mtail(); buf[Pos(old)] = d; @@ -216,7 +238,7 @@ } bool pop_front(Data &d) { - Guard<MutexT> guard(mutex_); + Guard<Mutex> guard(mutex_); if (!empty()) { auto old = mhead(); d = buf[Pos(old)]; @@ -233,8 +255,7 @@ CircularBuffer(CircularBuffer &&); CircularBuffer &operator=(const CircularBuffer &) = delete; CircularBuffer &operator=(CircularBuffer &&) = delete; - typedef CasMutex<true> MutexT; - // static_assert(sizeof(MutexT) == 16); + meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } size_type head() const { return Pos(mhead()); } size_type tail() const { return Pos(mtail()); } @@ -244,7 +265,7 @@ enum { eStateReady = 0x19833891 }; std::atomic<uint32_t> state_; const size_type capacity_; - MutexT mutex_; + Mutex mutex_; std::atomic<meta_type> mhead_; std::atomic<meta_type> mtail_; Alloc al_; diff --git a/src/shm.h b/src/shm.h index 17352fe..515d856 100644 --- a/src/shm.h +++ b/src/shm.h @@ -90,7 +90,7 @@ } }; -typedef robust::CasMutex<true> Mutex; +typedef robust::Mutex Mutex; typedef robust::Guard<Mutex> Guard; class SharedMemory : public mshm_t diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 6577b51..c6165e8 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -102,91 +102,6 @@ using namespace std::chrono; // using namespace std::chrono_literals; -BOOST_AUTO_TEST_CASE(MutexTest) -{ - // typedef robust::CasMutex<true> RobustMutex; - typedef MutexWithPidCheck RobustMutex; - - for (int i = 0; i < 20; ++i) { - int size = i; - int left = size & 7; - int rsize = size + ((8 - left) & 7); - printf("size: %3d, rsize: %3d\n", size, rsize); - } - SharedMemory &shm = TestShm(); - // shm.Remove(); - // return; - GlobalInit(shm); - - const std::string mtx_name("test_mutex"); - const std::string int_name("test_int"); - auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name); - auto pi = shm.FindOrCreate<int>(int_name, 100); - - std::mutex m; - typedef std::chrono::steady_clock Clock; - auto Now = []() { return Clock::now().time_since_epoch(); }; - if (pi) { - auto old = *pi; - printf("int : %d, add1: %d\n", old, ++*pi); - } - - { - boost::timer::auto_cpu_timer timer; - const int ntimes = 1000 * 1000; - printf("test lock/unlock %d times: ", ntimes); - RobustMutex mutex; - auto Lock = [&]() { - for (int i = 0; i < ntimes; ++i) { - mutex.lock(); - mutex.unlock(); - } - }; - std::thread t1(Lock), t2(Lock); - t1.join(); - t2.join(); - } - - auto MSFromNow = [](const int ms) { - using namespace boost::posix_time; - ptime cur = boost::posix_time::microsec_clock::universal_time(); - return cur + millisec(ms); - }; - - auto TryLock = [&]() { - if (mtx->try_lock()) { - printf("try_lock ok\n"); - return true; - } else { - printf("try_lock failed\n"); - return false; - } - }; - auto Unlock = [&]() { - mtx->unlock(); - printf("unlocked\n"); - }; - - if (mtx) { - printf("mtx exists\n"); - if (TryLock()) { - auto op = [&]() { - if (TryLock()) { - Unlock(); - } - }; - op(); - std::thread t(op); - t.join(); - // Unlock(); - } else { - // mtx->unlock(); - } - } else { - printf("mtx not exists\n"); - } -} - BOOST_AUTO_TEST_CASE(ApiTest) { auto max_time = std::chrono::steady_clock::time_point::max(); diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp index 0d54b46..9384c10 100644 --- a/utest/robust_test.cpp +++ b/utest/robust_test.cpp @@ -3,8 +3,13 @@ using namespace robust; +enum { + eLockerBits = 32, + eLockerMask = MaskBits(sizeof(int) * 8), +}; + typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb; -Rcb *GetRCBImpl(SharedMemory &shm, const int nelem) +Rcb *GetRCB(SharedMemory &shm, const int nelem) { int cap = nelem + 1; typedef uint64_t Data; @@ -15,24 +20,13 @@ } return nullptr; } -Rcb *GetRCB(SharedMemory &shm, const int nelem) -{ - void **pStore = shm.FindOrCreate<void *>("test_rcb_pointer", nullptr); - if (pStore) { - if (!*pStore) { - *pStore = GetRCBImpl(shm, nelem); - } - return (Rcb *) *pStore; - } - return nullptr; -} void MySleep() { std::this_thread::sleep_for(2us); } -BOOST_AUTO_TEST_CASE(RobustTest) +BOOST_AUTO_TEST_CASE(QueueTest) { SharedMemory &shm = TestShm(); shm.Remove(); @@ -112,4 +106,88 @@ threads.WaitAll(); printf("total: %ld, expected: %ld\n", total.load(), correct_total); BOOST_CHECK_EQUAL(total.load(), correct_total); +} + +BOOST_AUTO_TEST_CASE(MutexTest) +{ + typedef robust::Mutex RobustMutex; + + for (int i = 0; i < 20; ++i) { + int size = i; + int left = size & 7; + int rsize = size + ((8 - left) & 7); + printf("size: %3d, rsize: %3d\n", size, rsize); + } + SharedMemory &shm = TestShm(); + // shm.Remove(); + // return; + GlobalInit(shm); + + const std::string mtx_name("test_mutex"); + const std::string int_name("test_int"); + auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name); + auto pi = shm.FindOrCreate<int>(int_name, 100); + + std::mutex m; + typedef std::chrono::steady_clock Clock; + auto Now = []() { return Clock::now().time_since_epoch(); }; + if (pi) { + auto old = *pi; + printf("int : %d, add1: %d\n", old, ++*pi); + } + + { + boost::timer::auto_cpu_timer timer; + const int ntimes = 1000 * 1000; + printf("test lock/unlock %d times: ", ntimes); + RobustMutex mutex; + auto Lock = [&]() { + for (int i = 0; i < ntimes; ++i) { + mutex.lock(); + mutex.unlock(); + } + }; + std::thread t1(Lock), t2(Lock); + t1.join(); + t2.join(); + } + + auto MSFromNow = [](const int ms) { + using namespace boost::posix_time; + ptime cur = boost::posix_time::microsec_clock::universal_time(); + return cur + millisec(ms); + }; + + auto TryLock = [&]() { + if (mtx->try_lock()) { + printf("try_lock ok\n"); + return true; + } else { + printf("try_lock failed\n"); + return false; + } + }; + auto Unlock = [&]() { + mtx->unlock(); + printf("unlocked\n"); + }; + + if (mtx) { + printf("mtx exists\n"); + if (TryLock()) { + auto op = [&]() { + if (TryLock()) { + Unlock(); + } + }; + op(); + std::thread t(op); + t.join(); + // Unlock(); + } else { + // mtx->unlock(); + } + } else { + printf("mtx not exists\n"); + } } \ No newline at end of file -- Gitblit v1.8.0