From 58d904a328c0d849769b483e901a0be9426b8209 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期二, 20 七月 2021 20:20:44 +0800 Subject: [PATCH] 调整Request C.BHFree的位置 --- src/robust.h | 309 +++++++-------------------------------------------- 1 files changed, 44 insertions(+), 265 deletions(-) diff --git a/src/robust.h b/src/robust.h index 8657122..255aea4 100644 --- a/src/robust.h +++ b/src/robust.h @@ -19,295 +19,74 @@ #ifndef ROBUST_Q31RCWYU #define ROBUST_Q31RCWYU +#include "bh_util.h" #include "log.h" +#include <string.h> #include <atomic> #include <chrono> -#include <memory> -#include <mutex> -#include <string> -#include <sys/file.h> -#include <sys/ipc.h> -#include <sys/sem.h> -#include <sys/stat.h> -#include <sys/types.h> #include <unistd.h> namespace robust { -using namespace std::chrono; -using namespace std::chrono_literals; -constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; } - -void QuickSleep(); - -class CasMutex -{ - typedef uint64_t locker_t; - static inline locker_t this_locker() { return pthread_self(); } - static const uint64_t kLockerMask = MaskBits(63); - -public: - CasMutex() : - meta_(0) {} - int try_lock() - { - auto old = meta_.load(); - int r = 0; - if (!Locked(old)) { - r = MetaCas(old, Meta(1, this_locker())); - } - return r; - } - int lock() - { - int r = 0; - do { - r = try_lock(); - } while (r == 0); - return r; - } - void unlock() - { - auto old = meta_.load(); - if (Locked(old) && Locker(old) == this_locker()) { - MetaCas(old, Meta(0, this_locker())); - } - } - -private: - std::atomic<uint64_t> meta_; - bool Locked(uint64_t meta) { return (meta >> 63) == 1; } - locker_t Locker(uint64_t meta) { return meta & kLockerMask; } - uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; } - bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } -}; - -class NullMutex +// atomic queue, length is 1. +// lowest bit is used for data flag, 63 bit for data. +class AtomicQ63 { public: - template <class... T> - explicit NullMutex(T &&...t) {} // easy test. - bool try_lock() { return true; } - void lock() {} - void unlock() {} -}; - -// flock + mutex -class FMutex -{ -public: - typedef uint64_t id_t; - FMutex(id_t id) : - id_(id), fd_(Open(id_)) - { - if (fd_ == -1) { throw "error create mutex!"; } - } - ~FMutex() { Close(fd_); } - bool try_lock(); - void lock(); - void unlock(); - -private: - static std::string GetPath(id_t id) - { - const std::string dir("/tmp/.bhome_mtx"); - mkdir(dir.c_str(), 0777); - return dir + "/fm_" + std::to_string(id); - } - static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); } - static int Close(int fd) { return close(fd); } - void FLock(); - void FUnlock(); - id_t id_; - int fd_; - std::mutex mtx_; -}; - -union semun { - int val; /* Value for SETVAL */ - struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ - unsigned short *array; /* Array for GETALL, SETALL */ - struct seminfo *__buf; /* Buffer for IPC_INFO - (Linux-specific) */ -}; - -class SemMutex -{ -public: - SemMutex(key_t key) : - key_(key), sem_id_(semget(key, 1, 0666)) - { - if (sem_id_ == -1) { throw "error create semaphore."; } - } - ~SemMutex() {} - - bool try_lock() - { - sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT}; - return semop(sem_id_, &op, 1) == 0; - } - - void lock() - { - sembuf op = {0, -1, SEM_UNDO}; - semop(sem_id_, &op, 1); - } - - void unlock() - { - sembuf op = {0, 1, SEM_UNDO}; - semop(sem_id_, &op, 1); - } - -private: - key_t key_; - int sem_id_; -}; - -template <class Lock> -class Guard -{ -public: - Guard(Lock &l) : - l_(l) { l_.lock(); } - ~Guard() { l_.unlock(); } - -private: - Guard(const Guard &); - Guard(Guard &&); - Lock &l_; -}; - -template <class D, class Alloc = std::allocator<D>> -class CircularBuffer -{ - typedef uint32_t size_type; - typedef uint32_t count_type; - typedef uint64_t meta_type; - static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; } - static count_type Count(meta_type meta) { return meta >> 32; } - static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } - -public: - typedef D Data; - - CircularBuffer(const size_type cap) : - CircularBuffer(cap, Alloc()) {} - CircularBuffer(const size_type cap, Alloc const &al) : - capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_)) - { - if (!buf) { - throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!"); - } else { - memset(&buf[0], 0, sizeof(D) * capacity_); - } - } - ~CircularBuffer() { al_.deallocate(buf, capacity_); } - - bool push_back(const Data d) - { - auto old = mtail(); - auto pos = Pos(old); - auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0); - if (!full) { - buf[pos] = d; - return mtail_.compare_exchange_strong(old, next(old)); - } - return false; - } - bool pop_front(Data &d) - { - auto old = mhead(); - auto pos = Pos(old); - if (!(pos == tail())) { - d = buf[pos]; - return mhead_.compare_exchange_strong(old, next(old)); - } else { - return false; - } - } - -private: - CircularBuffer(const CircularBuffer &); - CircularBuffer(CircularBuffer &&); - CircularBuffer &operator=(const CircularBuffer &) = delete; - CircularBuffer &operator=(CircularBuffer &&) = delete; - - meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } - size_type head() const { return Pos(mhead()); } - size_type tail() const { return Pos(mtail()); } - meta_type mhead() const { return mhead_.load(); } - meta_type mtail() const { return mtail_.load(); } - // data - const size_type capacity_; - std::atomic<meta_type> mhead_; - std::atomic<meta_type> mtail_; - Alloc al_; - typename Alloc::pointer buf = nullptr; -}; - -template <unsigned PowerSize = 4, class Int = int64_t> -class AtomicQueue -{ -public: - typedef uint32_t size_type; - typedef Int Data; - typedef std::atomic<Data> AData; - static_assert(sizeof(Data) == sizeof(AData)); - enum { - power = PowerSize, - capacity = (1 << power), - mask = capacity - 1, - }; - - AtomicQueue() { memset(this, 0, sizeof(*this)); } - size_type head() const { return head_.load(); } - size_type tail() const { return tail_.load(); } - bool like_empty() const { return head() == tail() && Empty(buf[head()]); } - bool like_full() const { return head() == tail() && !Empty(buf[head()]); } + typedef int64_t Data; + AtomicQ63() { memset(this, 0, sizeof(*this)); } bool push(const Data d, bool try_more = false) { - bool r = false; - size_type i = 0; - do { - auto pos = tail(); - if (tail_.compare_exchange_strong(pos, Next(pos))) { - auto cur = buf[pos].load(); - r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d)); - } - } while (try_more && !r && ++i < capacity); - return r; + auto cur = buf.load(); + return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d)); } bool pop(Data &d, bool try_more = false) { - bool r = false; - Data cur; - size_type i = 0; - do { - auto pos = head(); - if (head_.compare_exchange_strong(pos, Next(pos))) { - cur = buf[pos].load(); - r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0); - } - } while (try_more && !r && ++i < capacity); + Data cur = buf.load(); + bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0); if (r) { d = Dec(cur); } return r; } private: - static_assert(std::is_integral<Data>::value, "Data must be integral type!"); - static_assert(std::is_signed<Data>::value, "Data must be signed type!"); - static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!"); - static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. - static size_type Next(const size_type index) { return (index + 1) & mask; } - std::atomic<size_type> head_; - std::atomic<size_type> tail_; - AData buf[capacity]; + typedef std::atomic<Data> AData; + // static_assert(sizeof(Data) == sizeof(AData)); + + AData buf; +}; + +// atomic request-reply process, one cycle a time. +class AtomicReqRep +{ +public: + typedef int64_t Data; + typedef std::function<Data(const Data)> Handler; + bool ClientRequest(const Data request, Data &reply); + bool ServerProcess(Handler onReq); + AtomicReqRep() : + data_(0), timestamp_(now()) {} + +private: + enum State { + eStateFree, + eStateRequest, + eStateReply + }; + static int GetState(Data d) { return d & MaskBits(3); } + static Data Encode(Data d, State st) { return (d << 3) | st; } + static Data Decode(Data d) { return d >> 3; } + typedef std::chrono::steady_clock steady_clock; + typedef steady_clock::duration Duration; + static Duration now() { return steady_clock::now().time_since_epoch(); } + + bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } + std::atomic<Data> data_; + std::atomic<Duration> timestamp_; }; } // namespace robust -- Gitblit v1.8.0