From 43d4e95770b0519341153202c9a535aaa8e164c5 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 01 六月 2021 14:22:20 +0800 Subject: [PATCH] refactor, remove useless code. --- src/robust.h | 264 +++++++--------------------------------------------- 1 files changed, 35 insertions(+), 229 deletions(-) diff --git a/src/robust.h b/src/robust.h index c70e2fe..62eb0b4 100644 --- a/src/robust.h +++ b/src/robust.h @@ -23,228 +23,11 @@ #include "log.h" #include <atomic> #include <chrono> -#include <memory> -#include <mutex> -#include <string> -#include <sys/file.h> -#include <sys/ipc.h> -#include <sys/sem.h> -#include <sys/stat.h> -#include <sys/types.h> #include <unistd.h> namespace robust { - -using namespace std::chrono; -using namespace std::chrono_literals; -void QuickSleep(); - -class CasMutex -{ - typedef uint64_t locker_t; - static inline locker_t this_locker() { return pthread_self(); } - static const uint64_t kLockerMask = MaskBits(63); - -public: - CasMutex() : - meta_(0) {} - int try_lock() - { - auto old = meta_.load(); - int r = 0; - if (!Locked(old)) { - r = MetaCas(old, Meta(1, this_locker())); - } - return r; - } - int lock() - { - int r = 0; - do { - r = try_lock(); - } while (r == 0); - return r; - } - void unlock() - { - auto old = meta_.load(); - if (Locked(old) && Locker(old) == this_locker()) { - MetaCas(old, Meta(0, this_locker())); - } - } - -private: - std::atomic<uint64_t> meta_; - bool Locked(uint64_t meta) { return (meta >> 63) == 1; } - locker_t Locker(uint64_t meta) { return meta & kLockerMask; } - uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; } - bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } -}; - -class NullMutex -{ -public: - template <class... T> - explicit NullMutex(T &&...t) {} // easy test. - bool try_lock() { return true; } - void lock() {} - void unlock() {} -}; - -// flock + mutex -class FMutex -{ -public: - typedef uint64_t id_t; - FMutex(id_t id) : - id_(id), fd_(Open(id_)), count_(0) - { - if (fd_ == -1) { throw "error create mutex!"; } - } - ~FMutex() { Close(fd_); } - bool try_lock(); - void lock(); - void unlock(); - -private: - static std::string GetPath(id_t id) - { - const std::string dir("/tmp/.bhome_mtx"); - mkdir(dir.c_str(), 0777); - return dir + "/fm_" + std::to_string(id); - } - static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); } - static int Close(int fd) { return close(fd); } - id_t id_; - int fd_; - std::mutex mtx_; - std::atomic<int32_t> count_; -}; - -union semun { - int val; /* Value for SETVAL */ - struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ - unsigned short *array; /* Array for GETALL, SETALL */ - struct seminfo *__buf; /* Buffer for IPC_INFO - (Linux-specific) */ -}; - -class SemMutex -{ -public: - SemMutex(key_t key) : - key_(key), sem_id_(semget(key, 1, 0666)) - { - if (sem_id_ == -1) { throw "error create semaphore."; } - } - ~SemMutex() {} - - bool try_lock() - { - sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT}; - return semop(sem_id_, &op, 1) == 0; - } - - void lock() - { - sembuf op = {0, -1, SEM_UNDO}; - semop(sem_id_, &op, 1); - } - - void unlock() - { - sembuf op = {0, 1, SEM_UNDO}; - semop(sem_id_, &op, 1); - } - -private: - key_t key_; - int sem_id_; -}; - -template <class Lock> -class Guard -{ -public: - Guard(Lock &l) : - l_(l) { l_.lock(); } - ~Guard() { l_.unlock(); } - -private: - Guard(const Guard &); - Guard(Guard &&); - Lock &l_; -}; - -template <class D, class Alloc = std::allocator<D>> -class CircularBuffer -{ - typedef uint32_t size_type; - typedef uint32_t count_type; - typedef uint64_t meta_type; - static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; } - static count_type Count(meta_type meta) { return meta >> 32; } - static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } - -public: - typedef D Data; - - CircularBuffer(const size_type cap) : - CircularBuffer(cap, Alloc()) {} - CircularBuffer(const size_type cap, Alloc const &al) : - capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_)) - { - if (!buf) { - throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!"); - } else { - memset(&buf[0], 0, sizeof(D) * capacity_); - } - } - ~CircularBuffer() { al_.deallocate(buf, capacity_); } - - bool push_back(const Data d) - { - auto old = mtail(); - auto pos = Pos(old); - auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0); - if (!full) { - buf[pos] = d; - return mtail_.compare_exchange_strong(old, next(old)); - } - return false; - } - bool pop_front(Data &d) - { - auto old = mhead(); - auto pos = Pos(old); - if (!(pos == tail())) { - d = buf[pos]; - return mhead_.compare_exchange_strong(old, next(old)); - } else { - return false; - } - } - -private: - CircularBuffer(const CircularBuffer &); - CircularBuffer(CircularBuffer &&); - CircularBuffer &operator=(const CircularBuffer &) = delete; - CircularBuffer &operator=(CircularBuffer &&) = delete; - - meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } - size_type head() const { return Pos(mhead()); } - size_type tail() const { return Pos(mtail()); } - meta_type mhead() const { return mhead_.load(); } - meta_type mtail() const { return mtail_.load(); } - // data - const size_type capacity_; - std::atomic<meta_type> mhead_; - std::atomic<meta_type> mtail_; - Alloc al_; - typename Alloc::pointer buf = nullptr; -}; - +/* template <unsigned PowerSize = 4, class Int = int64_t> class AtomicQueue { @@ -262,8 +45,6 @@ AtomicQueue() { memset(this, 0, sizeof(*this)); } size_type head() const { return head_.load(); } size_type tail() const { return tail_.load(); } - bool like_empty() const { return head() == tail() && Empty(buf[head()]); } - bool like_full() const { return head() == tail() && !Empty(buf[head()]); } bool push(const Data d, bool try_more = false) { bool r = false; @@ -307,16 +88,13 @@ std::atomic<size_type> tail_; AData buf[capacity]; }; +//*/ -template <class Int> -class AtomicQueue<0, Int> +class AtomicQ63 { - typedef Int Data; - typedef std::atomic<Data> AData; - static_assert(sizeof(Data) == sizeof(AData)); - public: - AtomicQueue() { memset(this, 0, sizeof(*this)); } + typedef int64_t Data; + AtomicQ63() { memset(this, 0, sizeof(*this)); } bool push(const Data d, bool try_more = false) { auto cur = buf.load(); @@ -329,15 +107,43 @@ if (r) { d = Dec(cur); } return r; } - uint32_t head() const { return 0; } - uint32_t tail() const { return 0; } private: static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. + + typedef std::atomic<Data> AData; + static_assert(sizeof(Data) == sizeof(AData)); + AData buf; }; +class AtomicReqRep +{ +public: + typedef int64_t Data; + typedef std::function<Data(const Data)> Handler; + bool ClientRequest(const Data request, Data &reply); + bool ServerProcess(Handler onReq); + +private: + enum State { + eStateFree, + eStateRequest, + eStateReply + }; + static int GetState(Data d) { return d & MaskBits(3); } + static Data Encode(Data d, State st) { return (d << 3) | st; } + static Data Decode(Data d) { return d >> 3; } + typedef std::chrono::steady_clock steady_clock; + typedef steady_clock::duration Duration; + Duration now() { return steady_clock::now().time_since_epoch(); } + + bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } + std::atomic<Data> data_; + std::atomic<Duration> timestamp_; +}; + } // namespace robust #endif // end of include guard: ROBUST_Q31RCWYU -- Gitblit v1.8.0