From 7ecd6323ffedbfef92c87c02b2a8680dd53b772c Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 06 五月 2021 19:37:50 +0800 Subject: [PATCH] rename atomic queue io function. --- src/robust.h | 196 +++++++++++++++++++++++++++++++++++++------------ 1 files changed, 148 insertions(+), 48 deletions(-) diff --git a/src/robust.h b/src/robust.h index 19e9bda..3334bc0 100644 --- a/src/robust.h +++ b/src/robust.h @@ -19,6 +19,7 @@ #ifndef ROBUST_Q31RCWYU #define ROBUST_Q31RCWYU +#include "log.h" #include <atomic> #include <chrono> #include <memory> @@ -32,6 +33,7 @@ using namespace std::chrono; using namespace std::chrono_literals; +constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; } void QuickSleep(); @@ -102,20 +104,60 @@ char buf[4]; }; -template <bool isRobust = false> -class CasMutex +class PidLocker { - static pid_t pid() +public: + typedef int locker_t; + enum { eLockerBits = sizeof(locker_t) * 8 }; + static locker_t this_locker() { - static pid_t val = getpid(); + static locker_t val = getpid(); return val; } - static bool Killed(pid_t pid) + static bool is_alive(locker_t locker) { return true; } +}; + +class RobustPidLocker +{ +public: + typedef int locker_t; + enum { eLockerBits = sizeof(locker_t) * 8 }; + static locker_t this_locker() + { + static locker_t val = getpid(); + return val; + } + static bool is_alive(locker_t locker) { char buf[64] = {0}; - snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid); - return access(buf, F_OK) != 0; + snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker); + return access(buf, F_OK) == 0; } +}; + +class ExpiredLocker +{ +public: + typedef int64_t locker_t; + enum { eLockerBits = 63 }; + static locker_t this_locker() { return Now(); } + static bool is_alive(locker_t locker) + { + return Now() < locker + steady_clock::duration(10s).count(); + } + +private: + static locker_t Now() { return steady_clock::now().time_since_epoch().count(); } +}; + +template <class LockerT> +class CasMutex +{ + typedef typename LockerT::locker_t locker_t; + static inline locker_t this_locker() { return LockerT::this_locker(); } + static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); } + static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits); + static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!"); public: CasMutex() : @@ -126,11 +168,11 @@ auto old = meta_.load(); int r = 0; if (!Locked(old)) { - r = MetaCas(old, Meta(1, pid())); - } else if (isRobust && Killed(Pid(old))) { - r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1; + r = MetaCas(old, Meta(1, this_locker())); + } else if (!is_alive(Locker(old))) { + r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1; if (r) { - printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r); + LOG_DEBUG() << "captured locker " << int64_t(Locker(old)) << " -> " << int64_t(this_locker()) << ", locker = " << r; } } return r; @@ -146,19 +188,20 @@ void unlock() { auto old = meta_.load(); - if (Locked(old) && Pid(old) == pid()) { - MetaCas(old, Meta(0, pid())); + if (Locked(old) && Locker(old) == this_locker()) { + MetaCas(old, Meta(0, this_locker())); } } private: std::atomic<uint64_t> meta_; - bool Locked(uint64_t meta) { return (meta >> 63) != 0; } - pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); } - uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; } + bool Locked(uint64_t meta) { return (meta >> 63) == 1; } + locker_t Locker(uint64_t meta) { return meta & kLockerMask; } + uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; } bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } - static_assert(sizeof(pid_t) < sizeof(uint64_t)); }; + +typedef CasMutex<RobustPidLocker> Mutex; template <class Lock> class Guard @@ -182,7 +225,7 @@ typedef uint64_t meta_type; static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; } static count_type Count(meta_type meta) { return meta >> 32; } - static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } + static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } public: typedef D Data; @@ -190,66 +233,123 @@ CircularBuffer(const size_type cap) : CircularBuffer(cap, Alloc()) {} CircularBuffer(const size_type cap, Alloc const &al) : - state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap)) + capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_)) { if (!buf) { - throw("error allocate buffer: out of mem!"); - } - } - ~CircularBuffer() - { - al_.deallocate(buf, capacity_); - } - size_type size() const { return (capacity_ + tail() - head()) % capacity_; } - bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; } - bool empty() const { return head() == tail(); } - bool push_back(Data d) - { - Guard<MutexT> guard(mutex_); - if (!full()) { - auto old = mtail(); - buf[Pos(old)] = d; - return mtail_.compare_exchange_strong(old, next(old)); + throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!"); } else { - return false; + memset(&buf[0], 0, sizeof(D) * capacity_); } + } + ~CircularBuffer() { al_.deallocate(buf, capacity_); } + + bool push_back(const Data d) + { + Guard<Mutex> guard(mutex_); + auto old = mtail(); + auto pos = Pos(old); + auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0); + if (!full) { + buf[pos] = d; + return mtail_.compare_exchange_strong(old, next(old)); + } + return false; } bool pop_front(Data &d) { - Guard<MutexT> guard(mutex_); - if (!empty()) { - auto old = mhead(); - d = buf[Pos(old)]; + Guard<Mutex> guard(mutex_); + auto old = mhead(); + auto pos = Pos(old); + if (!(pos == tail())) { + d = buf[pos]; return mhead_.compare_exchange_strong(old, next(old)); } else { return false; } } - bool Ready() const { return state_.load() == eStateReady; } - void PutReady() { state_.store(eStateReady); } private: CircularBuffer(const CircularBuffer &); CircularBuffer(CircularBuffer &&); CircularBuffer &operator=(const CircularBuffer &) = delete; CircularBuffer &operator=(CircularBuffer &&) = delete; - typedef CasMutex<true> MutexT; - // static_assert(sizeof(MutexT) == 16); + meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } size_type head() const { return Pos(mhead()); } size_type tail() const { return Pos(mtail()); } meta_type mhead() const { return mhead_.load(); } meta_type mtail() const { return mtail_.load(); } // data - enum { eStateReady = 0x19833891 }; - std::atomic<uint32_t> state_; const size_type capacity_; - MutexT mutex_; + Mutex mutex_; std::atomic<meta_type> mhead_; std::atomic<meta_type> mtail_; Alloc al_; typename Alloc::pointer buf = nullptr; }; +template <unsigned PowerSize = 4, class Int = int64_t> +class AtomicQueue +{ +public: + typedef uint32_t size_type; + typedef Int Data; + typedef std::atomic<Data> AData; + static_assert(sizeof(Data) == sizeof(AData)); + enum { + power = PowerSize, + capacity = (1 << power), + mask = capacity - 1, + }; + + AtomicQueue() { memset(this, 0, sizeof(*this)); } + size_type head() const { return head_.load(); } + size_type tail() const { return tail_.load(); } + bool like_empty() const { return head() == tail() && Empty(buf[head()]); } + bool like_full() const { return head() == tail() && !Empty(buf[head()]); } + bool push(const Data d, bool try_more = false) + { + bool r = false; + size_type i = 0; + do { + auto pos = tail(); + if (tail_.compare_exchange_strong(pos, Next(pos))) { + auto cur = buf[pos].load(); + r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d)); + } + } while (try_more && !r && ++i < capacity); + return r; + } + bool pop(Data &d, bool try_more = false) + { + bool r = false; + Data cur; + size_type i = 0; + do { + auto pos = head(); + if (head_.compare_exchange_strong(pos, Next(pos))) { + cur = buf[pos].load(); + r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0); + } + } while (try_more && !r && ++i < capacity); + if (r) { d = Dec(cur); } + return r; + } + +private: + static_assert(std::is_integral<Data>::value, "Data must be integral type!"); + static_assert(std::is_signed<Data>::value, "Data must be signed type!"); + static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!"); + + static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. + static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. + static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. + static size_type Next(const size_type index) { return (index + 1) & mask; } + + std::atomic<size_type> head_; + std::atomic<size_type> tail_; + AData buf[capacity]; +}; + } // namespace robust #endif // end of include guard: ROBUST_Q31RCWYU -- Gitblit v1.8.0