| | |
| | | |
| | | using namespace std::chrono; |
| | | using namespace std::chrono_literals; |
| | | constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; } |
| | | |
| | | void QuickSleep(); |
| | | |
| | |
| | | char buf[4]; |
| | | }; |
| | | |
| | | template <bool isRobust = false> |
| | | class CasMutex |
| | | class PidLocker |
| | | { |
| | | static pid_t pid() |
| | | public: |
| | | typedef int locker_t; |
| | | static locker_t this_locker() |
| | | { |
| | | static pid_t val = getpid(); |
| | | static locker_t val = getpid(); |
| | | return val; |
| | | } |
| | | static bool Killed(pid_t pid) |
| | | static bool is_alive(locker_t locker) { return true; } |
| | | }; |
| | | |
| | | class RobustPidLocker |
| | | { |
| | | public: |
| | | typedef int locker_t; |
| | | static locker_t this_locker() |
| | | { |
| | | static locker_t val = getpid(); |
| | | return val; |
| | | } |
| | | static bool is_alive(locker_t locker) |
| | | { |
| | | char buf[64] = {0}; |
| | | snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid); |
| | | return access(buf, F_OK) != 0; |
| | | snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker); |
| | | return access(buf, F_OK) == 0; |
| | | } |
| | | }; |
| | | |
| | | template <class LockerT> |
| | | class CasMutex |
| | | { |
| | | typedef typename LockerT::locker_t locker_t; |
| | | static inline locker_t this_locker() { return LockerT::this_locker(); } |
| | | static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); } |
| | | |
| | | public: |
| | | CasMutex() : |
| | |
| | | auto old = meta_.load(); |
| | | int r = 0; |
| | | if (!Locked(old)) { |
| | | r = MetaCas(old, Meta(1, pid())); |
| | | } else if (isRobust && Killed(Pid(old))) { |
| | | r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1; |
| | | r = MetaCas(old, Meta(1, this_locker())); |
| | | } else if (!is_alive(Locker(old))) { |
| | | r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1; |
| | | if (r) { |
| | | printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r); |
| | | printf("captured pid %d -> %d, r = %d\n", Locker(old), this_locker(), r); |
| | | } |
| | | } |
| | | return r; |
| | |
| | | void unlock() |
| | | { |
| | | auto old = meta_.load(); |
| | | if (Locked(old) && Pid(old) == pid()) { |
| | | MetaCas(old, Meta(0, pid())); |
| | | if (Locked(old) && Locker(old) == this_locker()) { |
| | | MetaCas(old, Meta(0, this_locker())); |
| | | } |
| | | } |
| | | |
| | | private: |
| | | static_assert(sizeof(locker_t) < sizeof(uint64_t), "locker size must be smaller than 64 bit!"); |
| | | std::atomic<uint64_t> meta_; |
| | | bool Locked(uint64_t meta) { return (meta >> 63) != 0; } |
| | | pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); } |
| | | uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; } |
| | | locker_t Locker(uint64_t meta) { return meta & MaskBits(sizeof(locker_t) * 8); } |
| | | uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; } |
| | | bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } |
| | | static_assert(sizeof(pid_t) < sizeof(uint64_t)); |
| | | }; |
| | | |
| | | typedef CasMutex<RobustPidLocker> Mutex; |
| | | |
| | | template <class Lock> |
| | | class Guard |
| | |
| | | state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap)) |
| | | { |
| | | if (!buf) { |
| | | throw("error allocate buffer: out of mem!"); |
| | | throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!"); |
| | | } |
| | | } |
| | | ~CircularBuffer() |
| | | { |
| | | al_.deallocate(buf, capacity_); |
| | | } |
| | | ~CircularBuffer() { al_.deallocate(buf, capacity_); } |
| | | |
| | | size_type size() const { return (capacity_ + tail() - head()) % capacity_; } |
| | | bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; } |
| | | bool empty() const { return head() == tail(); } |
| | | bool push_back(Data d) |
| | | { |
| | | Guard<MutexT> guard(mutex_); |
| | | Guard<Mutex> guard(mutex_); |
| | | if (!full()) { |
| | | auto old = mtail(); |
| | | buf[Pos(old)] = d; |
| | |
| | | } |
| | | bool pop_front(Data &d) |
| | | { |
| | | Guard<MutexT> guard(mutex_); |
| | | Guard<Mutex> guard(mutex_); |
| | | if (!empty()) { |
| | | auto old = mhead(); |
| | | d = buf[Pos(old)]; |
| | |
| | | CircularBuffer(CircularBuffer &&); |
| | | CircularBuffer &operator=(const CircularBuffer &) = delete; |
| | | CircularBuffer &operator=(CircularBuffer &&) = delete; |
| | | typedef CasMutex<true> MutexT; |
| | | // static_assert(sizeof(MutexT) == 16); |
| | | |
| | | meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } |
| | | size_type head() const { return Pos(mhead()); } |
| | | size_type tail() const { return Pos(mtail()); } |
| | |
| | | enum { eStateReady = 0x19833891 }; |
| | | std::atomic<uint32_t> state_; |
| | | const size_type capacity_; |
| | | MutexT mutex_; |
| | | Mutex mutex_; |
| | | std::atomic<meta_type> mhead_; |
| | | std::atomic<meta_type> mtail_; |
| | | Alloc al_; |