/* * ===================================================================================== * * Filename: robust.h * * Description: * * Version: 1.0 * Created: 2021年04月27日 10时04分29秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #ifndef ROBUST_Q31RCWYU #define ROBUST_Q31RCWYU #include "bh_util.h" #include "log.h" #include #include #include #include #include #include #include #include #include #include #include namespace robust { using namespace std::chrono; using namespace std::chrono_literals; void QuickSleep(); class CasMutex { typedef uint64_t locker_t; static inline locker_t this_locker() { return pthread_self(); } static const uint64_t kLockerMask = MaskBits(63); public: CasMutex() : meta_(0) {} int try_lock() { auto old = meta_.load(); int r = 0; if (!Locked(old)) { r = MetaCas(old, Meta(1, this_locker())); } return r; } int lock() { int r = 0; do { r = try_lock(); } while (r == 0); return r; } void unlock() { auto old = meta_.load(); if (Locked(old) && Locker(old) == this_locker()) { MetaCas(old, Meta(0, this_locker())); } } private: std::atomic meta_; bool Locked(uint64_t meta) { return (meta >> 63) == 1; } locker_t Locker(uint64_t meta) { return meta & kLockerMask; } uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; } bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } }; class NullMutex { public: template explicit NullMutex(T &&...t) {} // easy test. bool try_lock() { return true; } void lock() {} void unlock() {} }; // flock + mutex class FMutex { public: typedef uint64_t id_t; FMutex(id_t id) : id_(id), fd_(Open(id_)), count_(0) { if (fd_ == -1) { throw "error create mutex!"; } } ~FMutex() { Close(fd_); } bool try_lock(); void lock(); void unlock(); private: static std::string GetPath(id_t id) { const std::string dir("/tmp/.bhome_mtx"); mkdir(dir.c_str(), 0777); return dir + "/fm_" + std::to_string(id); } static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); } static int Close(int fd) { return close(fd); } id_t id_; int fd_; std::mutex mtx_; std::atomic count_; }; union semun { int val; /* Value for SETVAL */ struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ unsigned short *array; /* Array for GETALL, SETALL */ struct seminfo *__buf; /* Buffer for IPC_INFO (Linux-specific) */ }; class SemMutex { public: SemMutex(key_t key) : key_(key), sem_id_(semget(key, 1, 0666)) { if (sem_id_ == -1) { throw "error create semaphore."; } } ~SemMutex() {} bool try_lock() { sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT}; return semop(sem_id_, &op, 1) == 0; } void lock() { sembuf op = {0, -1, SEM_UNDO}; semop(sem_id_, &op, 1); } void unlock() { sembuf op = {0, 1, SEM_UNDO}; semop(sem_id_, &op, 1); } private: key_t key_; int sem_id_; }; template class Guard { public: Guard(Lock &l) : l_(l) { l_.lock(); } ~Guard() { l_.unlock(); } private: Guard(const Guard &); Guard(Guard &&); Lock &l_; }; template class AtomicQueue { public: typedef uint32_t size_type; typedef Int Data; typedef std::atomic AData; static_assert(sizeof(Data) == sizeof(AData)); enum { power = PowerSize, capacity = (1 << power), mask = capacity - 1, }; AtomicQueue() { memset(this, 0, sizeof(*this)); } size_type head() const { return head_.load(); } size_type tail() const { return tail_.load(); } bool like_empty() const { return head() == tail() && Empty(buf[head()]); } bool like_full() const { return head() == tail() && !Empty(buf[head()]); } bool push(const Data d, bool try_more = false) { bool r = false; size_type i = 0; do { auto pos = tail(); if (tail_.compare_exchange_strong(pos, Next(pos))) { auto cur = buf[pos].load(); r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d)); } } while (try_more && !r && ++i < capacity); return r; } bool pop(Data &d, bool try_more = false) { bool r = false; Data cur; size_type i = 0; do { auto pos = head(); if (head_.compare_exchange_strong(pos, Next(pos))) { cur = buf[pos].load(); r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0); } } while (try_more && !r && ++i < capacity); if (r) { d = Dec(cur); } return r; } private: static_assert(std::is_integral::value, "Data must be integral type!"); static_assert(std::is_signed::value, "Data must be signed type!"); static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!"); static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. static size_type Next(const size_type index) { return (index + 1) & mask; } std::atomic head_; std::atomic tail_; AData buf[capacity]; }; template class AtomicQueue<0, Int> { typedef Int Data; typedef std::atomic AData; static_assert(sizeof(Data) == sizeof(AData)); public: AtomicQueue() { memset(this, 0, sizeof(*this)); } bool push(const Data d, bool try_more = false) { auto cur = buf.load(); return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d)); } bool pop(Data &d, bool try_more = false) { Data cur = buf.load(); bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0); if (r) { d = Dec(cur); } return r; } uint32_t head() const { return 0; } uint32_t tail() const { return 0; } private: static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. AData buf; }; class AtomicReqRep { public: typedef int64_t Data; typedef std::function Handler; bool ClientRequest(const Data request, Data &reply) { auto end_time = now() + 3s; do { Data cur = data_.load(); if (GetState(cur) == eStateFree && DataCas(cur, Encode(request, eStateRequest))) { do { yield(); cur = data_.load(); if (GetState(cur) == eStateReply) { DataCas(cur, Encode(0, eStateFree)); reply = Decode(cur); return true; } } while (now() < end_time); } yield(); } while (now() < end_time); return false; } bool ServerProcess(Handler onReq) { Data cur = data_.load(); switch (GetState(cur)) { case eStateRequest: if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) { timestamp_ = now(); return true; } break; case eStateReply: if (timestamp_.load() + 3s < now()) { DataCas(cur, Encode(0, eStateFree)); } break; case eStateFree: default: break; } return false; } private: enum State { eStateFree, eStateRequest, eStateReply }; static int GetState(Data d) { return d & MaskBits(3); } static Data Encode(Data d, State st) { return (d << 3) | st; } static Data Decode(Data d) { return d >> 3; } static void yield() { QuickSleep(); } typedef steady_clock::duration Duration; Duration now() { return steady_clock::now().time_since_epoch(); } bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } std::atomic data_; std::atomic timestamp_; }; } // namespace robust #endif // end of include guard: ROBUST_Q31RCWYU