src/robust.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/robust.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/shm.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
utest/api_test.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
utest/robust_test.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/robust.cpp
@@ -26,7 +26,7 @@ { static_assert(sizeof(steady_clock::duration) == sizeof(int64_t)); static_assert(sizeof(RobustReqRep) == 24); static_assert(sizeof(CasMutex<false>) == 8); static_assert(sizeof(Mutex) == 8); static_assert(sizeof(CircularBuffer<int>) == 48); auto Now() { return steady_clock::now().time_since_epoch(); } src/robust.h
@@ -32,6 +32,7 @@ using namespace std::chrono; using namespace std::chrono_literals; constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; } void QuickSleep(); @@ -102,20 +103,41 @@ char buf[4]; }; template <bool isRobust = false> class CasMutex class PidLocker { static pid_t pid() public: typedef int locker_t; static locker_t this_locker() { static pid_t val = getpid(); static locker_t val = getpid(); return val; } static bool Killed(pid_t pid) static bool is_alive(locker_t locker) { return true; } }; class RobustPidLocker { public: typedef int locker_t; static locker_t this_locker() { static locker_t val = getpid(); return val; } static bool is_alive(locker_t locker) { char buf[64] = {0}; snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid); return access(buf, F_OK) != 0; snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker); return access(buf, F_OK) == 0; } }; template <class LockerT> class CasMutex { typedef typename LockerT::locker_t locker_t; static inline locker_t this_locker() { return LockerT::this_locker(); } static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); } public: CasMutex() : @@ -126,11 +148,11 @@ auto old = meta_.load(); int r = 0; if (!Locked(old)) { r = MetaCas(old, Meta(1, pid())); } else if (isRobust && Killed(Pid(old))) { r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1; r = MetaCas(old, Meta(1, this_locker())); } else if (!is_alive(Locker(old))) { r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1; if (r) { printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r); printf("captured pid %d -> %d, r = %d\n", Locker(old), this_locker(), r); } } return r; @@ -146,19 +168,21 @@ void unlock() { auto old = meta_.load(); if (Locked(old) && Pid(old) == pid()) { MetaCas(old, Meta(0, pid())); if (Locked(old) && Locker(old) == this_locker()) { MetaCas(old, Meta(0, this_locker())); } } private: static_assert(sizeof(locker_t) < sizeof(uint64_t), "locker size must be smaller than 64 bit!"); std::atomic<uint64_t> meta_; bool Locked(uint64_t meta) { return (meta >> 63) != 0; } pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); } uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; } locker_t Locker(uint64_t meta) { return meta & MaskBits(sizeof(locker_t) * 8); } uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; } bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } static_assert(sizeof(pid_t) < sizeof(uint64_t)); }; typedef CasMutex<RobustPidLocker> Mutex; template <class Lock> class Guard @@ -193,19 +217,17 @@ state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap)) { if (!buf) { throw("error allocate buffer: out of mem!"); throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!"); } } ~CircularBuffer() { al_.deallocate(buf, capacity_); } ~CircularBuffer() { al_.deallocate(buf, capacity_); } size_type size() const { return (capacity_ + tail() - head()) % capacity_; } bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; } bool empty() const { return head() == tail(); } bool push_back(Data d) { Guard<MutexT> guard(mutex_); Guard<Mutex> guard(mutex_); if (!full()) { auto old = mtail(); buf[Pos(old)] = d; @@ -216,7 +238,7 @@ } bool pop_front(Data &d) { Guard<MutexT> guard(mutex_); Guard<Mutex> guard(mutex_); if (!empty()) { auto old = mhead(); d = buf[Pos(old)]; @@ -233,8 +255,7 @@ CircularBuffer(CircularBuffer &&); CircularBuffer &operator=(const CircularBuffer &) = delete; CircularBuffer &operator=(CircularBuffer &&) = delete; typedef CasMutex<true> MutexT; // static_assert(sizeof(MutexT) == 16); meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } size_type head() const { return Pos(mhead()); } size_type tail() const { return Pos(mtail()); } @@ -244,7 +265,7 @@ enum { eStateReady = 0x19833891 }; std::atomic<uint32_t> state_; const size_type capacity_; MutexT mutex_; Mutex mutex_; std::atomic<meta_type> mhead_; std::atomic<meta_type> mtail_; Alloc al_; src/shm.h
@@ -90,7 +90,7 @@ } }; typedef robust::CasMutex<true> Mutex; typedef robust::Mutex Mutex; typedef robust::Guard<Mutex> Guard; class SharedMemory : public mshm_t utest/api_test.cpp
@@ -102,91 +102,6 @@ using namespace std::chrono; // using namespace std::chrono_literals; BOOST_AUTO_TEST_CASE(MutexTest) { // typedef robust::CasMutex<true> RobustMutex; typedef MutexWithPidCheck RobustMutex; for (int i = 0; i < 20; ++i) { int size = i; int left = size & 7; int rsize = size + ((8 - left) & 7); printf("size: %3d, rsize: %3d\n", size, rsize); } SharedMemory &shm = TestShm(); // shm.Remove(); // return; GlobalInit(shm); const std::string mtx_name("test_mutex"); const std::string int_name("test_int"); auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name); auto pi = shm.FindOrCreate<int>(int_name, 100); std::mutex m; typedef std::chrono::steady_clock Clock; auto Now = []() { return Clock::now().time_since_epoch(); }; if (pi) { auto old = *pi; printf("int : %d, add1: %d\n", old, ++*pi); } { boost::timer::auto_cpu_timer timer; const int ntimes = 1000 * 1000; printf("test lock/unlock %d times: ", ntimes); RobustMutex mutex; auto Lock = [&]() { for (int i = 0; i < ntimes; ++i) { mutex.lock(); mutex.unlock(); } }; std::thread t1(Lock), t2(Lock); t1.join(); t2.join(); } auto MSFromNow = [](const int ms) { using namespace boost::posix_time; ptime cur = boost::posix_time::microsec_clock::universal_time(); return cur + millisec(ms); }; auto TryLock = [&]() { if (mtx->try_lock()) { printf("try_lock ok\n"); return true; } else { printf("try_lock failed\n"); return false; } }; auto Unlock = [&]() { mtx->unlock(); printf("unlocked\n"); }; if (mtx) { printf("mtx exists\n"); if (TryLock()) { auto op = [&]() { if (TryLock()) { Unlock(); } }; op(); std::thread t(op); t.join(); // Unlock(); } else { // mtx->unlock(); } } else { printf("mtx not exists\n"); } } BOOST_AUTO_TEST_CASE(ApiTest) { auto max_time = std::chrono::steady_clock::time_point::max(); utest/robust_test.cpp
@@ -3,8 +3,13 @@ using namespace robust; enum { eLockerBits = 32, eLockerMask = MaskBits(sizeof(int) * 8), }; typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb; Rcb *GetRCBImpl(SharedMemory &shm, const int nelem) Rcb *GetRCB(SharedMemory &shm, const int nelem) { int cap = nelem + 1; typedef uint64_t Data; @@ -15,24 +20,13 @@ } return nullptr; } Rcb *GetRCB(SharedMemory &shm, const int nelem) { void **pStore = shm.FindOrCreate<void *>("test_rcb_pointer", nullptr); if (pStore) { if (!*pStore) { *pStore = GetRCBImpl(shm, nelem); } return (Rcb *) *pStore; } return nullptr; } void MySleep() { std::this_thread::sleep_for(2us); } BOOST_AUTO_TEST_CASE(RobustTest) BOOST_AUTO_TEST_CASE(QueueTest) { SharedMemory &shm = TestShm(); shm.Remove(); @@ -112,4 +106,88 @@ threads.WaitAll(); printf("total: %ld, expected: %ld\n", total.load(), correct_total); BOOST_CHECK_EQUAL(total.load(), correct_total); } BOOST_AUTO_TEST_CASE(MutexTest) { typedef robust::Mutex RobustMutex; for (int i = 0; i < 20; ++i) { int size = i; int left = size & 7; int rsize = size + ((8 - left) & 7); printf("size: %3d, rsize: %3d\n", size, rsize); } SharedMemory &shm = TestShm(); // shm.Remove(); // return; GlobalInit(shm); const std::string mtx_name("test_mutex"); const std::string int_name("test_int"); auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name); auto pi = shm.FindOrCreate<int>(int_name, 100); std::mutex m; typedef std::chrono::steady_clock Clock; auto Now = []() { return Clock::now().time_since_epoch(); }; if (pi) { auto old = *pi; printf("int : %d, add1: %d\n", old, ++*pi); } { boost::timer::auto_cpu_timer timer; const int ntimes = 1000 * 1000; printf("test lock/unlock %d times: ", ntimes); RobustMutex mutex; auto Lock = [&]() { for (int i = 0; i < ntimes; ++i) { mutex.lock(); mutex.unlock(); } }; std::thread t1(Lock), t2(Lock); t1.join(); t2.join(); } auto MSFromNow = [](const int ms) { using namespace boost::posix_time; ptime cur = boost::posix_time::microsec_clock::universal_time(); return cur + millisec(ms); }; auto TryLock = [&]() { if (mtx->try_lock()) { printf("try_lock ok\n"); return true; } else { printf("try_lock failed\n"); return false; } }; auto Unlock = [&]() { mtx->unlock(); printf("unlocked\n"); }; if (mtx) { printf("mtx exists\n"); if (TryLock()) { auto op = [&]() { if (TryLock()) { Unlock(); } }; op(); std::thread t(op); t.join(); // Unlock(); } else { // mtx->unlock(); } } else { printf("mtx not exists\n"); } }