/* * ===================================================================================== * * Filename: robust.h * * Description: * * Version: 1.0 * Created: 2021年04月27日 10时04分29秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #ifndef ROBUST_Q31RCWYU #define ROBUST_Q31RCWYU #include #include #include #include #include #include #include namespace robust { using namespace std::chrono; using namespace std::chrono_literals; constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; } void QuickSleep(); class RobustReqRep { typedef uint32_t State; typedef std::string Msg; typedef std::chrono::steady_clock::duration Duration; public: enum ErrorCode { eSuccess = 0, eTimeout = EAGAIN, eSizeError = EINVAL, }; explicit RobustReqRep(const uint32_t max_len) : capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {} void PutReady() { state_.store(eStateReady); } bool Ready() const { return state_.load() == eStateReady; } uint32_t capacity() const { return capacity_; } int ClientRequest(const Msg &request, Msg &reply) { int r = ClientWriteRequest(request); if (r == eSuccess) { r = ClientReadReply(reply); } return r; } int ClientReadReply(Msg &reply); int ClientWriteRequest(const Msg &request); int ServerReadRequest(Msg &request); int ServerWriteReply(const Msg &reply); private: RobustReqRep(const RobustReqRep &); RobustReqRep(RobustReqRep &&); RobustReqRep &operator=(const RobustReqRep &) = delete; RobustReqRep &operator=(RobustReqRep &&) = delete; enum { eStateInit = 0, eStateReady = 0x19833891, eClientWriteBegin, eClientWriteEnd, eServerReadBegin, eServerReadEnd, eServerWriteBegin, eServerWriteEnd, eClientReadBegin, eClientReadEnd = eStateReady, }; bool StateCas(State exp, State val); void Write(const Msg &msg) { size_.store(msg.size()); memcpy(buf, msg.data(), msg.size()); } void Read(Msg &msg) { msg.assign(buf, size_.load()); } const uint32_t capacity_; std::atomic state_; static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data."); std::atomic timestamp_; std::atomic size_; char buf[4]; }; class PidLocker { public: typedef int locker_t; static locker_t this_locker() { static locker_t val = getpid(); return val; } static bool is_alive(locker_t locker) { return true; } }; class RobustPidLocker { public: typedef int locker_t; static locker_t this_locker() { static locker_t val = getpid(); return val; } static bool is_alive(locker_t locker) { char buf[64] = {0}; snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker); return access(buf, F_OK) == 0; } }; template class CasMutex { typedef typename LockerT::locker_t locker_t; static inline locker_t this_locker() { return LockerT::this_locker(); } static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); } public: CasMutex() : meta_(0) {} int try_lock() { const auto t = steady_clock::now().time_since_epoch().count(); auto old = meta_.load(); int r = 0; if (!Locked(old)) { r = MetaCas(old, Meta(1, this_locker())); } else if (!is_alive(Locker(old))) { r = static_cast(MetaCas(old, Meta(1, this_locker()))) << 1; if (r) { printf("captured pid %d -> %d, r = %d\n", Locker(old), this_locker(), r); } } return r; } int lock() { int r = 0; do { r = try_lock(); } while (r == 0); return r; } void unlock() { auto old = meta_.load(); if (Locked(old) && Locker(old) == this_locker()) { MetaCas(old, Meta(0, this_locker())); } } private: static_assert(sizeof(locker_t) < sizeof(uint64_t), "locker size must be smaller than 64 bit!"); std::atomic meta_; bool Locked(uint64_t meta) { return (meta >> 63) != 0; } locker_t Locker(uint64_t meta) { return meta & MaskBits(sizeof(locker_t) * 8); } uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; } bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } }; typedef CasMutex Mutex; template class Guard { public: Guard(Lock &l) : l_(l) { l_.lock(); } ~Guard() { l_.unlock(); } private: Guard(const Guard &); Guard(Guard &&); Lock &l_; }; template > class CircularBuffer { typedef uint32_t size_type; typedef uint32_t count_type; typedef uint64_t meta_type; static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; } static count_type Count(meta_type meta) { return meta >> 32; } static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } public: typedef D Data; CircularBuffer(const size_type cap) : CircularBuffer(cap, Alloc()) {} CircularBuffer(const size_type cap, Alloc const &al) : state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap)) { if (!buf) { throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!"); } } ~CircularBuffer() { al_.deallocate(buf, capacity_); } size_type size() const { return (capacity_ + tail() - head()) % capacity_; } bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; } bool empty() const { return head() == tail(); } bool push_back(Data d) { Guard guard(mutex_); if (!full()) { auto old = mtail(); buf[Pos(old)] = d; return mtail_.compare_exchange_strong(old, next(old)); } else { return false; } } bool pop_front(Data &d) { Guard guard(mutex_); if (!empty()) { auto old = mhead(); d = buf[Pos(old)]; return mhead_.compare_exchange_strong(old, next(old)); } else { return false; } } bool Ready() const { return state_.load() == eStateReady; } void PutReady() { state_.store(eStateReady); } private: CircularBuffer(const CircularBuffer &); CircularBuffer(CircularBuffer &&); CircularBuffer &operator=(const CircularBuffer &) = delete; CircularBuffer &operator=(CircularBuffer &&) = delete; meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } size_type head() const { return Pos(mhead()); } size_type tail() const { return Pos(mtail()); } meta_type mhead() const { return mhead_.load(); } meta_type mtail() const { return mtail_.load(); } // data enum { eStateReady = 0x19833891 }; std::atomic state_; const size_type capacity_; Mutex mutex_; std::atomic mhead_; std::atomic mtail_; Alloc al_; typename Alloc::pointer buf = nullptr; }; } // namespace robust #endif // end of include guard: ROBUST_Q31RCWYU