| | |
| | | |
| | | #include "bh_util.h" |
| | | #include "log.h" |
| | | #include <string.h> |
| | | #include <atomic> |
| | | #include <chrono> |
| | | #include <memory> |
| | | #include <mutex> |
| | | #include <string> |
| | | #include <sys/file.h> |
| | | #include <sys/ipc.h> |
| | | #include <sys/sem.h> |
| | | #include <sys/stat.h> |
| | | #include <sys/types.h> |
| | | #include <unistd.h> |
| | | |
| | | namespace robust |
| | | { |
| | | |
| | | using namespace std::chrono; |
| | | using namespace std::chrono_literals; |
| | | void QuickSleep(); |
| | | |
| | | class CasMutex |
| | | { |
| | | typedef uint64_t locker_t; |
| | | static inline locker_t this_locker() { return pthread_self(); } |
| | | static const uint64_t kLockerMask = MaskBits(63); |
| | | |
| | | public: |
| | | CasMutex() : |
| | | meta_(0) {} |
| | | int try_lock() |
| | | { |
| | | auto old = meta_.load(); |
| | | int r = 0; |
| | | if (!Locked(old)) { |
| | | r = MetaCas(old, Meta(1, this_locker())); |
| | | } |
| | | return r; |
| | | } |
| | | int lock() |
| | | { |
| | | int r = 0; |
| | | do { |
| | | r = try_lock(); |
| | | } while (r == 0); |
| | | return r; |
| | | } |
| | | void unlock() |
| | | { |
| | | auto old = meta_.load(); |
| | | if (Locked(old) && Locker(old) == this_locker()) { |
| | | MetaCas(old, Meta(0, this_locker())); |
| | | } |
| | | } |
| | | |
| | | private: |
| | | std::atomic<uint64_t> meta_; |
| | | bool Locked(uint64_t meta) { return (meta >> 63) == 1; } |
| | | locker_t Locker(uint64_t meta) { return meta & kLockerMask; } |
| | | uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; } |
| | | bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } |
| | | }; |
| | | |
| | | class NullMutex |
| | | // atomic queue, length is 1. |
| | | // lowest bit is used for data flag, 63 bit for data. |
| | | class AtomicQ63 |
| | | { |
| | | public: |
| | | template <class... T> |
| | | explicit NullMutex(T &&...t) {} // easy test. |
| | | bool try_lock() { return true; } |
| | | void lock() {} |
| | | void unlock() {} |
| | | }; |
| | | |
| | | // flock + mutex |
| | | class FMutex |
| | | { |
| | | public: |
| | | typedef uint64_t id_t; |
| | | FMutex(id_t id) : |
| | | id_(id), fd_(Open(id_)), count_(0) |
| | | { |
| | | if (fd_ == -1) { throw "error create mutex!"; } |
| | | } |
| | | ~FMutex() { Close(fd_); } |
| | | bool try_lock(); |
| | | void lock(); |
| | | void unlock(); |
| | | |
| | | private: |
| | | static std::string GetPath(id_t id) |
| | | { |
| | | const std::string dir("/tmp/.bhome_mtx"); |
| | | mkdir(dir.c_str(), 0777); |
| | | return dir + "/fm_" + std::to_string(id); |
| | | } |
| | | static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); } |
| | | static int Close(int fd) { return close(fd); } |
| | | id_t id_; |
| | | int fd_; |
| | | std::mutex mtx_; |
| | | std::atomic<int32_t> count_; |
| | | }; |
| | | |
| | | union semun { |
| | | int val; /* Value for SETVAL */ |
| | | struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ |
| | | unsigned short *array; /* Array for GETALL, SETALL */ |
| | | struct seminfo *__buf; /* Buffer for IPC_INFO |
| | | (Linux-specific) */ |
| | | }; |
| | | |
| | | class SemMutex |
| | | { |
| | | public: |
| | | SemMutex(key_t key) : |
| | | key_(key), sem_id_(semget(key, 1, 0666)) |
| | | { |
| | | if (sem_id_ == -1) { throw "error create semaphore."; } |
| | | } |
| | | ~SemMutex() {} |
| | | |
| | | bool try_lock() |
| | | { |
| | | sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT}; |
| | | return semop(sem_id_, &op, 1) == 0; |
| | | } |
| | | |
| | | void lock() |
| | | { |
| | | sembuf op = {0, -1, SEM_UNDO}; |
| | | semop(sem_id_, &op, 1); |
| | | } |
| | | |
| | | void unlock() |
| | | { |
| | | sembuf op = {0, 1, SEM_UNDO}; |
| | | semop(sem_id_, &op, 1); |
| | | } |
| | | |
| | | private: |
| | | key_t key_; |
| | | int sem_id_; |
| | | }; |
| | | |
| | | template <class Lock> |
| | | class Guard |
| | | { |
| | | public: |
| | | Guard(Lock &l) : |
| | | l_(l) { l_.lock(); } |
| | | ~Guard() { l_.unlock(); } |
| | | |
| | | private: |
| | | Guard(const Guard &); |
| | | Guard(Guard &&); |
| | | Lock &l_; |
| | | }; |
| | | |
| | | template <unsigned PowerSize = 4, class Int = int64_t> |
| | | class AtomicQueue |
| | | { |
| | | public: |
| | | typedef uint32_t size_type; |
| | | typedef Int Data; |
| | | typedef std::atomic<Data> AData; |
| | | static_assert(sizeof(Data) == sizeof(AData)); |
| | | enum { |
| | | power = PowerSize, |
| | | capacity = (1 << power), |
| | | mask = capacity - 1, |
| | | }; |
| | | |
| | | AtomicQueue() { memset(this, 0, sizeof(*this)); } |
| | | size_type head() const { return head_.load(); } |
| | | size_type tail() const { return tail_.load(); } |
| | | bool like_empty() const { return head() == tail() && Empty(buf[head()]); } |
| | | bool like_full() const { return head() == tail() && !Empty(buf[head()]); } |
| | | bool push(const Data d, bool try_more = false) |
| | | { |
| | | bool r = false; |
| | | size_type i = 0; |
| | | do { |
| | | auto pos = tail(); |
| | | if (tail_.compare_exchange_strong(pos, Next(pos))) { |
| | | auto cur = buf[pos].load(); |
| | | r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d)); |
| | | } |
| | | } while (try_more && !r && ++i < capacity); |
| | | return r; |
| | | } |
| | | bool pop(Data &d, bool try_more = false) |
| | | { |
| | | bool r = false; |
| | | Data cur; |
| | | size_type i = 0; |
| | | do { |
| | | auto pos = head(); |
| | | if (head_.compare_exchange_strong(pos, Next(pos))) { |
| | | cur = buf[pos].load(); |
| | | r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0); |
| | | } |
| | | } while (try_more && !r && ++i < capacity); |
| | | if (r) { d = Dec(cur); } |
| | | return r; |
| | | } |
| | | |
| | | private: |
| | | static_assert(std::is_integral<Data>::value, "Data must be integral type!"); |
| | | static_assert(std::is_signed<Data>::value, "Data must be signed type!"); |
| | | static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!"); |
| | | |
| | | static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. |
| | | static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. |
| | | static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. |
| | | static size_type Next(const size_type index) { return (index + 1) & mask; } |
| | | |
| | | std::atomic<size_type> head_; |
| | | std::atomic<size_type> tail_; |
| | | AData buf[capacity]; |
| | | }; |
| | | |
| | | template <class Int> |
| | | class AtomicQueue<0, Int> |
| | | { |
| | | typedef Int Data; |
| | | typedef std::atomic<Data> AData; |
| | | static_assert(sizeof(Data) == sizeof(AData)); |
| | | |
| | | public: |
| | | AtomicQueue() { memset(this, 0, sizeof(*this)); } |
| | | typedef int64_t Data; |
| | | AtomicQ63() { memset(this, 0, sizeof(*this)); } |
| | | bool push(const Data d, bool try_more = false) |
| | | { |
| | | auto cur = buf.load(); |
| | |
| | | if (r) { d = Dec(cur); } |
| | | return r; |
| | | } |
| | | uint32_t head() const { return 0; } |
| | | uint32_t tail() const { return 0; } |
| | | |
| | | private: |
| | | static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. |
| | | static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. |
| | | static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. |
| | | |
| | | typedef std::atomic<Data> AData; |
| | | // static_assert(sizeof(Data) == sizeof(AData)); |
| | | |
| | | AData buf; |
| | | }; |
| | | |
| | | // atomic request-reply process, one cycle a time. |
| | | class AtomicReqRep |
| | | { |
| | | public: |
| | | typedef int64_t Data; |
| | | typedef std::function<Data(const Data)> Handler; |
| | | bool ClientRequest(const Data request, Data &reply) |
| | | { |
| | | auto end_time = now() + 3s; |
| | | do { |
| | | Data cur = data_.load(); |
| | | if (GetState(cur) == eStateFree && |
| | | DataCas(cur, Encode(request, eStateRequest))) { |
| | | do { |
| | | yield(); |
| | | cur = data_.load(); |
| | | if (GetState(cur) == eStateReply) { |
| | | DataCas(cur, Encode(0, eStateFree)); |
| | | reply = Decode(cur); |
| | | return true; |
| | | } |
| | | } while (now() < end_time); |
| | | } |
| | | yield(); |
| | | } while (now() < end_time); |
| | | return false; |
| | | } |
| | | |
| | | bool ServerProcess(Handler onReq) |
| | | { |
| | | Data cur = data_.load(); |
| | | switch (GetState(cur)) { |
| | | case eStateRequest: |
| | | if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) { |
| | | timestamp_ = now(); |
| | | return true; |
| | | } |
| | | break; |
| | | case eStateReply: |
| | | if (timestamp_.load() + 3s < now()) { |
| | | DataCas(cur, Encode(0, eStateFree)); |
| | | } |
| | | break; |
| | | case eStateFree: |
| | | default: break; |
| | | } |
| | | return false; |
| | | } |
| | | bool ClientRequest(const Data request, Data &reply); |
| | | bool ServerProcess(Handler onReq); |
| | | AtomicReqRep() : |
| | | data_(0), timestamp_(now()) {} |
| | | |
| | | private: |
| | | enum State { |
| | |
| | | static int GetState(Data d) { return d & MaskBits(3); } |
| | | static Data Encode(Data d, State st) { return (d << 3) | st; } |
| | | static Data Decode(Data d) { return d >> 3; } |
| | | static void yield() { QuickSleep(); } |
| | | typedef std::chrono::steady_clock steady_clock; |
| | | typedef steady_clock::duration Duration; |
| | | Duration now() { return steady_clock::now().time_since_epoch(); } |
| | | static Duration now() { return steady_clock::now().time_since_epoch(); } |
| | | |
| | | bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } |
| | | std::atomic<Data> data_; |