| | |
| | | #ifndef ROBUST_Q31RCWYU |
| | | #define ROBUST_Q31RCWYU |
| | | |
| | | #include "bh_util.h" |
| | | #include "log.h" |
| | | #include <string.h> |
| | | #include <atomic> |
| | | #include <chrono> |
| | | #include <memory> |
| | | #include <string.h> |
| | | #include <string> |
| | | #include <sys/types.h> |
| | | #include <unistd.h> |
| | | |
| | | namespace robust |
| | | { |
| | | |
| | | using namespace std::chrono; |
| | | using namespace std::chrono_literals; |
| | | constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; } |
| | | |
| | | void QuickSleep(); |
| | | |
| | | class RobustReqRep |
| | | { |
| | | typedef uint32_t State; |
| | | typedef std::string Msg; |
| | | typedef std::chrono::steady_clock::duration Duration; |
| | | |
| | | public: |
| | | enum ErrorCode { |
| | | eSuccess = 0, |
| | | eTimeout = EAGAIN, |
| | | eSizeError = EINVAL, |
| | | }; |
| | | |
| | | explicit RobustReqRep(const uint32_t max_len) : |
| | | capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {} |
| | | |
| | | void PutReady() { state_.store(eStateReady); } |
| | | bool Ready() const { return state_.load() == eStateReady; } |
| | | uint32_t capacity() const { return capacity_; } |
| | | |
| | | int ClientRequest(const Msg &request, Msg &reply) |
| | | { |
| | | int r = ClientWriteRequest(request); |
| | | if (r == eSuccess) { |
| | | r = ClientReadReply(reply); |
| | | } |
| | | return r; |
| | | } |
| | | int ClientReadReply(Msg &reply); |
| | | int ClientWriteRequest(const Msg &request); |
| | | int ServerReadRequest(Msg &request); |
| | | int ServerWriteReply(const Msg &reply); |
| | | |
| | | private: |
| | | RobustReqRep(const RobustReqRep &); |
| | | RobustReqRep(RobustReqRep &&); |
| | | RobustReqRep &operator=(const RobustReqRep &) = delete; |
| | | RobustReqRep &operator=(RobustReqRep &&) = delete; |
| | | |
| | | enum { |
| | | eStateInit = 0, |
| | | eStateReady = 0x19833891, |
| | | eClientWriteBegin, |
| | | eClientWriteEnd, |
| | | eServerReadBegin, |
| | | eServerReadEnd, |
| | | eServerWriteBegin, |
| | | eServerWriteEnd, |
| | | eClientReadBegin, |
| | | eClientReadEnd = eStateReady, |
| | | }; |
| | | bool StateCas(State exp, State val); |
| | | void Write(const Msg &msg) |
| | | { |
| | | size_.store(msg.size()); |
| | | memcpy(buf, msg.data(), msg.size()); |
| | | } |
| | | void Read(Msg &msg) { msg.assign(buf, size_.load()); } |
| | | |
| | | const uint32_t capacity_; |
| | | std::atomic<State> state_; |
| | | static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data."); |
| | | std::atomic<Duration> timestamp_; |
| | | std::atomic<int32_t> size_; |
| | | char buf[4]; |
| | | }; |
| | | |
| | | class PidLocker |
| | | // atomic queue, length is 1. |
| | | // lowest bit is used for data flag, 63 bit for data. |
| | | class AtomicQ63 |
| | | { |
| | | public: |
| | | typedef int locker_t; |
| | | enum { eLockerBits = sizeof(locker_t) * 8 }; |
| | | static locker_t this_locker() |
| | | typedef int64_t Data; |
| | | AtomicQ63() { memset(this, 0, sizeof(*this)); } |
| | | bool push(const Data d, bool try_more = false) |
| | | { |
| | | static locker_t val = getpid(); |
| | | return val; |
| | | auto cur = buf.load(); |
| | | return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d)); |
| | | } |
| | | static bool is_alive(locker_t locker) { return true; } |
| | | }; |
| | | |
| | | class RobustPidLocker |
| | | { |
| | | public: |
| | | typedef int locker_t; |
| | | enum { eLockerBits = sizeof(locker_t) * 8 }; |
| | | static locker_t this_locker() |
| | | bool pop(Data &d, bool try_more = false) |
| | | { |
| | | static locker_t val = getpid(); |
| | | return val; |
| | | } |
| | | static bool is_alive(locker_t locker) |
| | | { |
| | | char buf[64] = {0}; |
| | | snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker); |
| | | return access(buf, F_OK) == 0; |
| | | } |
| | | }; |
| | | |
| | | class ExpiredLocker |
| | | { |
| | | public: |
| | | typedef int64_t locker_t; |
| | | enum { eLockerBits = 63 }; |
| | | static locker_t this_locker() { return Now(); } |
| | | static bool is_alive(locker_t locker) |
| | | { |
| | | return Now() < locker + steady_clock::duration(10s).count(); |
| | | } |
| | | |
| | | private: |
| | | static locker_t Now() { return steady_clock::now().time_since_epoch().count(); } |
| | | }; |
| | | |
| | | template <class LockerT> |
| | | class CasMutex |
| | | { |
| | | typedef typename LockerT::locker_t locker_t; |
| | | static inline locker_t this_locker() { return LockerT::this_locker(); } |
| | | static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); } |
| | | static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits); |
| | | static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!"); |
| | | |
| | | public: |
| | | CasMutex() : |
| | | meta_(0) {} |
| | | int try_lock() |
| | | { |
| | | const auto t = steady_clock::now().time_since_epoch().count(); |
| | | auto old = meta_.load(); |
| | | int r = 0; |
| | | if (!Locked(old)) { |
| | | r = MetaCas(old, Meta(1, this_locker())); |
| | | } else if (!is_alive(Locker(old))) { |
| | | r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1; |
| | | if (r) { |
| | | printf("captured locker %ld -> %ld, locker = %d\n", int64_t(Locker(old)), int64_t(this_locker()), r); |
| | | } |
| | | } |
| | | return r; |
| | | } |
| | | int lock() |
| | | { |
| | | int r = 0; |
| | | do { |
| | | r = try_lock(); |
| | | } while (r == 0); |
| | | return r; |
| | | } |
| | | void unlock() |
| | | { |
| | | auto old = meta_.load(); |
| | | if (Locked(old) && Locker(old) == this_locker()) { |
| | | MetaCas(old, Meta(0, this_locker())); |
| | | } |
| | | } |
| | | |
| | | private: |
| | | std::atomic<uint64_t> meta_; |
| | | bool Locked(uint64_t meta) { return (meta >> 63) == 1; } |
| | | locker_t Locker(uint64_t meta) { return meta & kLockerMask; } |
| | | uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; } |
| | | bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } |
| | | }; |
| | | |
| | | typedef CasMutex<RobustPidLocker> Mutex; |
| | | |
| | | template <class Lock> |
| | | class Guard |
| | | { |
| | | public: |
| | | Guard(Lock &l) : |
| | | l_(l) { l_.lock(); } |
| | | ~Guard() { l_.unlock(); } |
| | | |
| | | private: |
| | | Guard(const Guard &); |
| | | Guard(Guard &&); |
| | | Lock &l_; |
| | | }; |
| | | |
| | | template <class D, class Alloc = std::allocator<D>> |
| | | class CircularBuffer |
| | | { |
| | | typedef uint32_t size_type; |
| | | typedef uint32_t count_type; |
| | | typedef uint64_t meta_type; |
| | | static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; } |
| | | static count_type Count(meta_type meta) { return meta >> 32; } |
| | | static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } |
| | | |
| | | public: |
| | | typedef D Data; |
| | | |
| | | CircularBuffer(const size_type cap) : |
| | | CircularBuffer(cap, Alloc()) {} |
| | | CircularBuffer(const size_type cap, Alloc const &al) : |
| | | capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_)) |
| | | { |
| | | if (!buf) { |
| | | throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!"); |
| | | } else { |
| | | memset(&buf[0], 0, sizeof(D) * capacity_); |
| | | } |
| | | } |
| | | ~CircularBuffer() { al_.deallocate(buf, capacity_); } |
| | | |
| | | bool push_back(const Data d) |
| | | { |
| | | Guard<Mutex> guard(mutex_); |
| | | auto old = mtail(); |
| | | auto pos = Pos(old); |
| | | auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0); |
| | | if (!full) { |
| | | buf[pos] = d; |
| | | return mtail_.compare_exchange_strong(old, next(old)); |
| | | } |
| | | return false; |
| | | } |
| | | bool pop_front(Data &d) |
| | | { |
| | | Guard<Mutex> guard(mutex_); |
| | | auto old = mhead(); |
| | | auto pos = Pos(old); |
| | | if (!(pos == tail())) { |
| | | d = buf[pos]; |
| | | return mhead_.compare_exchange_strong(old, next(old)); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | private: |
| | | CircularBuffer(const CircularBuffer &); |
| | | CircularBuffer(CircularBuffer &&); |
| | | CircularBuffer &operator=(const CircularBuffer &) = delete; |
| | | CircularBuffer &operator=(CircularBuffer &&) = delete; |
| | | |
| | | meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } |
| | | size_type head() const { return Pos(mhead()); } |
| | | size_type tail() const { return Pos(mtail()); } |
| | | meta_type mhead() const { return mhead_.load(); } |
| | | meta_type mtail() const { return mtail_.load(); } |
| | | // data |
| | | const size_type capacity_; |
| | | Mutex mutex_; |
| | | std::atomic<meta_type> mhead_; |
| | | std::atomic<meta_type> mtail_; |
| | | Alloc al_; |
| | | typename Alloc::pointer buf = nullptr; |
| | | }; |
| | | |
| | | template <unsigned PowerSize = 4, class Int = int64_t> |
| | | class AtomicQueue |
| | | { |
| | | public: |
| | | typedef uint32_t size_type; |
| | | typedef Int Data; |
| | | typedef std::atomic<Data> AData; |
| | | static_assert(sizeof(Data) == sizeof(AData)); |
| | | enum { |
| | | power = PowerSize, |
| | | capacity = (1 << power), |
| | | mask = capacity - 1, |
| | | }; |
| | | |
| | | AtomicQueue() { memset(this, 0, sizeof(*this)); } |
| | | size_type head() const { return head_.load(); } |
| | | size_type tail() const { return tail_.load(); } |
| | | bool like_empty() const { return head() == tail() && Empty(buf[head()]); } |
| | | bool like_full() const { return head() == tail() && !Empty(buf[head()]); } |
| | | bool push_back(const Data d, bool try_more = false) |
| | | { |
| | | bool r = false; |
| | | size_type i = 0; |
| | | do { |
| | | auto pos = tail(); |
| | | auto cur = buf[pos].load(); |
| | | r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d)); |
| | | tail_.compare_exchange_strong(pos, Next(pos)); |
| | | } while (try_more && !r && ++i < capacity); |
| | | return r; |
| | | } |
| | | bool pop_front(Data &d, bool try_more = false) |
| | | { |
| | | bool r = false; |
| | | Data cur; |
| | | size_type i = 0; |
| | | do { |
| | | auto pos = head(); |
| | | cur = buf[pos].load(); |
| | | r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0); |
| | | head_.compare_exchange_strong(pos, Next(pos)); |
| | | } while (try_more && !r && ++i < capacity); |
| | | Data cur = buf.load(); |
| | | bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0); |
| | | if (r) { d = Dec(cur); } |
| | | return r; |
| | | } |
| | | |
| | | private: |
| | | static_assert(std::is_integral<Data>::value, "Data must be integral type!"); |
| | | static_assert(std::is_signed<Data>::value, "Data must be signed type!"); |
| | | static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!"); |
| | | |
| | | static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. |
| | | static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. |
| | | static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. |
| | | static size_type Next(const size_type index) { return (index + 1) & mask; } |
| | | |
| | | std::atomic<size_type> head_; |
| | | std::atomic<size_type> tail_; |
| | | AData buf[capacity]; |
| | | typedef std::atomic<Data> AData; |
| | | // static_assert(sizeof(Data) == sizeof(AData)); |
| | | |
| | | AData buf; |
| | | }; |
| | | |
| | | // atomic request-reply process, one cycle a time. |
| | | class AtomicReqRep |
| | | { |
| | | public: |
| | | typedef int64_t Data; |
| | | typedef std::function<Data(const Data)> Handler; |
| | | bool ClientRequest(const Data request, Data &reply); |
| | | bool ServerProcess(Handler onReq); |
| | | AtomicReqRep() : |
| | | data_(0), timestamp_(now()) {} |
| | | |
| | | private: |
| | | enum State { |
| | | eStateFree, |
| | | eStateRequest, |
| | | eStateReply |
| | | }; |
| | | static int GetState(Data d) { return d & MaskBits(3); } |
| | | static Data Encode(Data d, State st) { return (d << 3) | st; } |
| | | static Data Decode(Data d) { return d >> 3; } |
| | | typedef std::chrono::steady_clock steady_clock; |
| | | typedef steady_clock::duration Duration; |
| | | static Duration now() { return steady_clock::now().time_since_epoch(); } |
| | | |
| | | bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } |
| | | std::atomic<Data> data_; |
| | | std::atomic<Duration> timestamp_; |
| | | }; |
| | | |
| | | } // namespace robust |