From 43d4e95770b0519341153202c9a535aaa8e164c5 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 01 六月 2021 14:22:20 +0800 Subject: [PATCH] refactor, remove useless code. --- src/robust.h | 322 ++++++++++++++++++----------------------------------- 1 files changed, 108 insertions(+), 214 deletions(-) diff --git a/src/robust.h b/src/robust.h index 19e9bda..62eb0b4 100644 --- a/src/robust.h +++ b/src/robust.h @@ -19,236 +19,130 @@ #ifndef ROBUST_Q31RCWYU #define ROBUST_Q31RCWYU +#include "bh_util.h" +#include "log.h" #include <atomic> #include <chrono> -#include <memory> -#include <string.h> -#include <string> -#include <sys/types.h> #include <unistd.h> namespace robust { - -using namespace std::chrono; -using namespace std::chrono_literals; - -void QuickSleep(); - -class RobustReqRep -{ - typedef uint32_t State; - typedef std::string Msg; - typedef std::chrono::steady_clock::duration Duration; - -public: - enum ErrorCode { - eSuccess = 0, - eTimeout = EAGAIN, - eSizeError = EINVAL, - }; - - explicit RobustReqRep(const uint32_t max_len) : - capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {} - - void PutReady() { state_.store(eStateReady); } - bool Ready() const { return state_.load() == eStateReady; } - uint32_t capacity() const { return capacity_; } - - int ClientRequest(const Msg &request, Msg &reply) - { - int r = ClientWriteRequest(request); - if (r == eSuccess) { - r = ClientReadReply(reply); - } - return r; - } - int ClientReadReply(Msg &reply); - int ClientWriteRequest(const Msg &request); - int ServerReadRequest(Msg &request); - int ServerWriteReply(const Msg &reply); - -private: - RobustReqRep(const RobustReqRep &); - RobustReqRep(RobustReqRep &&); - RobustReqRep &operator=(const RobustReqRep &) = delete; - RobustReqRep &operator=(RobustReqRep &&) = delete; - - enum { - eStateInit = 0, - eStateReady = 0x19833891, - eClientWriteBegin, - eClientWriteEnd, - eServerReadBegin, - eServerReadEnd, - eServerWriteBegin, - eServerWriteEnd, - eClientReadBegin, - eClientReadEnd = eStateReady, - }; - bool StateCas(State exp, State val); - void Write(const Msg &msg) - { - size_.store(msg.size()); - memcpy(buf, msg.data(), msg.size()); - } - void Read(Msg &msg) { msg.assign(buf, size_.load()); } - - const uint32_t capacity_; - std::atomic<State> state_; - static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data."); - std::atomic<Duration> timestamp_; - std::atomic<int32_t> size_; - char buf[4]; -}; - -template <bool isRobust = false> -class CasMutex -{ - static pid_t pid() - { - static pid_t val = getpid(); - return val; - } - static bool Killed(pid_t pid) - { - char buf[64] = {0}; - snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid); - return access(buf, F_OK) != 0; - } - -public: - CasMutex() : - meta_(0) {} - int try_lock() - { - const auto t = steady_clock::now().time_since_epoch().count(); - auto old = meta_.load(); - int r = 0; - if (!Locked(old)) { - r = MetaCas(old, Meta(1, pid())); - } else if (isRobust && Killed(Pid(old))) { - r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1; - if (r) { - printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r); - } - } - return r; - } - int lock() - { - int r = 0; - do { - r = try_lock(); - } while (r == 0); - return r; - } - void unlock() - { - auto old = meta_.load(); - if (Locked(old) && Pid(old) == pid()) { - MetaCas(old, Meta(0, pid())); - } - } - -private: - std::atomic<uint64_t> meta_; - bool Locked(uint64_t meta) { return (meta >> 63) != 0; } - pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); } - uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; } - bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } - static_assert(sizeof(pid_t) < sizeof(uint64_t)); -}; - -template <class Lock> -class Guard +/* +template <unsigned PowerSize = 4, class Int = int64_t> +class AtomicQueue { public: - Guard(Lock &l) : - l_(l) { l_.lock(); } - ~Guard() { l_.unlock(); } - -private: - Guard(const Guard &); - Guard(Guard &&); - Lock &l_; -}; - -template <class D, class Alloc = std::allocator<D>> -class CircularBuffer -{ typedef uint32_t size_type; - typedef uint32_t count_type; - typedef uint64_t meta_type; - static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; } - static count_type Count(meta_type meta) { return meta >> 32; } - static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } + typedef Int Data; + typedef std::atomic<Data> AData; + static_assert(sizeof(Data) == sizeof(AData)); + enum { + power = PowerSize, + capacity = (1 << power), + mask = capacity - 1, + }; -public: - typedef D Data; - - CircularBuffer(const size_type cap) : - CircularBuffer(cap, Alloc()) {} - CircularBuffer(const size_type cap, Alloc const &al) : - state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap)) + AtomicQueue() { memset(this, 0, sizeof(*this)); } + size_type head() const { return head_.load(); } + size_type tail() const { return tail_.load(); } + bool push(const Data d, bool try_more = false) { - if (!buf) { - throw("error allocate buffer: out of mem!"); - } + bool r = false; + size_type i = 0; + do { + auto pos = tail(); + if (tail_.compare_exchange_strong(pos, Next(pos))) { + auto cur = buf[pos].load(); + r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d)); + } + } while (try_more && !r && ++i < capacity); + return r; } - ~CircularBuffer() + bool pop(Data &d, bool try_more = false) { - al_.deallocate(buf, capacity_); + bool r = false; + Data cur; + size_type i = 0; + do { + auto pos = head(); + if (head_.compare_exchange_strong(pos, Next(pos))) { + cur = buf[pos].load(); + r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0); + } + } while (try_more && !r && ++i < capacity); + if (r) { d = Dec(cur); } + return r; } - size_type size() const { return (capacity_ + tail() - head()) % capacity_; } - bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; } - bool empty() const { return head() == tail(); } - bool push_back(Data d) - { - Guard<MutexT> guard(mutex_); - if (!full()) { - auto old = mtail(); - buf[Pos(old)] = d; - return mtail_.compare_exchange_strong(old, next(old)); - } else { - return false; - } - } - bool pop_front(Data &d) - { - Guard<MutexT> guard(mutex_); - if (!empty()) { - auto old = mhead(); - d = buf[Pos(old)]; - return mhead_.compare_exchange_strong(old, next(old)); - } else { - return false; - } - } - bool Ready() const { return state_.load() == eStateReady; } - void PutReady() { state_.store(eStateReady); } private: - CircularBuffer(const CircularBuffer &); - CircularBuffer(CircularBuffer &&); - CircularBuffer &operator=(const CircularBuffer &) = delete; - CircularBuffer &operator=(CircularBuffer &&) = delete; - typedef CasMutex<true> MutexT; - // static_assert(sizeof(MutexT) == 16); - meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } - size_type head() const { return Pos(mhead()); } - size_type tail() const { return Pos(mtail()); } - meta_type mhead() const { return mhead_.load(); } - meta_type mtail() const { return mtail_.load(); } - // data - enum { eStateReady = 0x19833891 }; - std::atomic<uint32_t> state_; - const size_type capacity_; - MutexT mutex_; - std::atomic<meta_type> mhead_; - std::atomic<meta_type> mtail_; - Alloc al_; - typename Alloc::pointer buf = nullptr; + static_assert(std::is_integral<Data>::value, "Data must be integral type!"); + static_assert(std::is_signed<Data>::value, "Data must be signed type!"); + static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!"); + + static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. + static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. + static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. + static size_type Next(const size_type index) { return (index + 1) & mask; } + + std::atomic<size_type> head_; + std::atomic<size_type> tail_; + AData buf[capacity]; +}; +//*/ + +class AtomicQ63 +{ +public: + typedef int64_t Data; + AtomicQ63() { memset(this, 0, sizeof(*this)); } + bool push(const Data d, bool try_more = false) + { + auto cur = buf.load(); + return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d)); + } + bool pop(Data &d, bool try_more = false) + { + Data cur = buf.load(); + bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0); + if (r) { d = Dec(cur); } + return r; + } + +private: + static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. + static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. + static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. + + typedef std::atomic<Data> AData; + static_assert(sizeof(Data) == sizeof(AData)); + + AData buf; +}; + +class AtomicReqRep +{ +public: + typedef int64_t Data; + typedef std::function<Data(const Data)> Handler; + bool ClientRequest(const Data request, Data &reply); + bool ServerProcess(Handler onReq); + +private: + enum State { + eStateFree, + eStateRequest, + eStateReply + }; + static int GetState(Data d) { return d & MaskBits(3); } + static Data Encode(Data d, State st) { return (d << 3) | st; } + static Data Decode(Data d) { return d >> 3; } + typedef std::chrono::steady_clock steady_clock; + typedef steady_clock::duration Duration; + Duration now() { return steady_clock::now().time_since_epoch(); } + + bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } + std::atomic<Data> data_; + std::atomic<Duration> timestamp_; }; } // namespace robust -- Gitblit v1.8.0