From 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 19 五月 2021 19:14:13 +0800 Subject: [PATCH] add api BHQueryProcs. --- src/robust.h | 386 ++++++++++++++++++++++++++---------------------------- 1 files changed, 186 insertions(+), 200 deletions(-) diff --git a/src/robust.h b/src/robust.h index 3334bc0..b3b99c4 100644 --- a/src/robust.h +++ b/src/robust.h @@ -19,12 +19,17 @@ #ifndef ROBUST_Q31RCWYU #define ROBUST_Q31RCWYU +#include "bh_util.h" #include "log.h" #include <atomic> #include <chrono> #include <memory> -#include <string.h> +#include <mutex> #include <string> +#include <sys/file.h> +#include <sys/ipc.h> +#include <sys/sem.h> +#include <sys/stat.h> #include <sys/types.h> #include <unistd.h> @@ -33,147 +38,23 @@ using namespace std::chrono; using namespace std::chrono_literals; -constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; } - void QuickSleep(); -class RobustReqRep -{ - typedef uint32_t State; - typedef std::string Msg; - typedef std::chrono::steady_clock::duration Duration; - -public: - enum ErrorCode { - eSuccess = 0, - eTimeout = EAGAIN, - eSizeError = EINVAL, - }; - - explicit RobustReqRep(const uint32_t max_len) : - capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {} - - void PutReady() { state_.store(eStateReady); } - bool Ready() const { return state_.load() == eStateReady; } - uint32_t capacity() const { return capacity_; } - - int ClientRequest(const Msg &request, Msg &reply) - { - int r = ClientWriteRequest(request); - if (r == eSuccess) { - r = ClientReadReply(reply); - } - return r; - } - int ClientReadReply(Msg &reply); - int ClientWriteRequest(const Msg &request); - int ServerReadRequest(Msg &request); - int ServerWriteReply(const Msg &reply); - -private: - RobustReqRep(const RobustReqRep &); - RobustReqRep(RobustReqRep &&); - RobustReqRep &operator=(const RobustReqRep &) = delete; - RobustReqRep &operator=(RobustReqRep &&) = delete; - - enum { - eStateInit = 0, - eStateReady = 0x19833891, - eClientWriteBegin, - eClientWriteEnd, - eServerReadBegin, - eServerReadEnd, - eServerWriteBegin, - eServerWriteEnd, - eClientReadBegin, - eClientReadEnd = eStateReady, - }; - bool StateCas(State exp, State val); - void Write(const Msg &msg) - { - size_.store(msg.size()); - memcpy(buf, msg.data(), msg.size()); - } - void Read(Msg &msg) { msg.assign(buf, size_.load()); } - - const uint32_t capacity_; - std::atomic<State> state_; - static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data."); - std::atomic<Duration> timestamp_; - std::atomic<int32_t> size_; - char buf[4]; -}; - -class PidLocker -{ -public: - typedef int locker_t; - enum { eLockerBits = sizeof(locker_t) * 8 }; - static locker_t this_locker() - { - static locker_t val = getpid(); - return val; - } - static bool is_alive(locker_t locker) { return true; } -}; - -class RobustPidLocker -{ -public: - typedef int locker_t; - enum { eLockerBits = sizeof(locker_t) * 8 }; - static locker_t this_locker() - { - static locker_t val = getpid(); - return val; - } - static bool is_alive(locker_t locker) - { - char buf[64] = {0}; - snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker); - return access(buf, F_OK) == 0; - } -}; - -class ExpiredLocker -{ -public: - typedef int64_t locker_t; - enum { eLockerBits = 63 }; - static locker_t this_locker() { return Now(); } - static bool is_alive(locker_t locker) - { - return Now() < locker + steady_clock::duration(10s).count(); - } - -private: - static locker_t Now() { return steady_clock::now().time_since_epoch().count(); } -}; - -template <class LockerT> class CasMutex { - typedef typename LockerT::locker_t locker_t; - static inline locker_t this_locker() { return LockerT::this_locker(); } - static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); } - static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits); - static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!"); + typedef uint64_t locker_t; + static inline locker_t this_locker() { return pthread_self(); } + static const uint64_t kLockerMask = MaskBits(63); public: CasMutex() : meta_(0) {} int try_lock() { - const auto t = steady_clock::now().time_since_epoch().count(); auto old = meta_.load(); int r = 0; if (!Locked(old)) { r = MetaCas(old, Meta(1, this_locker())); - } else if (!is_alive(Locker(old))) { - r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1; - if (r) { - LOG_DEBUG() << "captured locker " << int64_t(Locker(old)) << " -> " << int64_t(this_locker()) << ", locker = " << r; - } } return r; } @@ -201,7 +82,86 @@ bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } }; -typedef CasMutex<RobustPidLocker> Mutex; +class NullMutex +{ +public: + template <class... T> + explicit NullMutex(T &&...t) {} // easy test. + bool try_lock() { return true; } + void lock() {} + void unlock() {} +}; + +// flock + mutex +class FMutex +{ +public: + typedef uint64_t id_t; + FMutex(id_t id) : + id_(id), fd_(Open(id_)), count_(0) + { + if (fd_ == -1) { throw "error create mutex!"; } + } + ~FMutex() { Close(fd_); } + bool try_lock(); + void lock(); + void unlock(); + +private: + static std::string GetPath(id_t id) + { + const std::string dir("/tmp/.bhome_mtx"); + mkdir(dir.c_str(), 0777); + return dir + "/fm_" + std::to_string(id); + } + static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); } + static int Close(int fd) { return close(fd); } + id_t id_; + int fd_; + std::mutex mtx_; + std::atomic<int32_t> count_; +}; + +union semun { + int val; /* Value for SETVAL */ + struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ + unsigned short *array; /* Array for GETALL, SETALL */ + struct seminfo *__buf; /* Buffer for IPC_INFO + (Linux-specific) */ +}; + +class SemMutex +{ +public: + SemMutex(key_t key) : + key_(key), sem_id_(semget(key, 1, 0666)) + { + if (sem_id_ == -1) { throw "error create semaphore."; } + } + ~SemMutex() {} + + bool try_lock() + { + sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT}; + return semop(sem_id_, &op, 1) == 0; + } + + void lock() + { + sembuf op = {0, -1, SEM_UNDO}; + semop(sem_id_, &op, 1); + } + + void unlock() + { + sembuf op = {0, 1, SEM_UNDO}; + semop(sem_id_, &op, 1); + } + +private: + key_t key_; + int sem_id_; +}; template <class Lock> class Guard @@ -215,77 +175,6 @@ Guard(const Guard &); Guard(Guard &&); Lock &l_; -}; - -template <class D, class Alloc = std::allocator<D>> -class CircularBuffer -{ - typedef uint32_t size_type; - typedef uint32_t count_type; - typedef uint64_t meta_type; - static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; } - static count_type Count(meta_type meta) { return meta >> 32; } - static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } - -public: - typedef D Data; - - CircularBuffer(const size_type cap) : - CircularBuffer(cap, Alloc()) {} - CircularBuffer(const size_type cap, Alloc const &al) : - capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_)) - { - if (!buf) { - throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!"); - } else { - memset(&buf[0], 0, sizeof(D) * capacity_); - } - } - ~CircularBuffer() { al_.deallocate(buf, capacity_); } - - bool push_back(const Data d) - { - Guard<Mutex> guard(mutex_); - auto old = mtail(); - auto pos = Pos(old); - auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0); - if (!full) { - buf[pos] = d; - return mtail_.compare_exchange_strong(old, next(old)); - } - return false; - } - bool pop_front(Data &d) - { - Guard<Mutex> guard(mutex_); - auto old = mhead(); - auto pos = Pos(old); - if (!(pos == tail())) { - d = buf[pos]; - return mhead_.compare_exchange_strong(old, next(old)); - } else { - return false; - } - } - -private: - CircularBuffer(const CircularBuffer &); - CircularBuffer(CircularBuffer &&); - CircularBuffer &operator=(const CircularBuffer &) = delete; - CircularBuffer &operator=(CircularBuffer &&) = delete; - - meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } - size_type head() const { return Pos(mhead()); } - size_type tail() const { return Pos(mtail()); } - meta_type mhead() const { return mhead_.load(); } - meta_type mtail() const { return mtail_.load(); } - // data - const size_type capacity_; - Mutex mutex_; - std::atomic<meta_type> mhead_; - std::atomic<meta_type> mtail_; - Alloc al_; - typename Alloc::pointer buf = nullptr; }; template <unsigned PowerSize = 4, class Int = int64_t> @@ -351,5 +240,102 @@ AData buf[capacity]; }; +template <class Int> +class AtomicQueue<0, Int> +{ + typedef Int Data; + typedef std::atomic<Data> AData; + static_assert(sizeof(Data) == sizeof(AData)); + +public: + AtomicQueue() { memset(this, 0, sizeof(*this)); } + bool push(const Data d, bool try_more = false) + { + auto cur = buf.load(); + return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d)); + } + bool pop(Data &d, bool try_more = false) + { + Data cur = buf.load(); + bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0); + if (r) { d = Dec(cur); } + return r; + } + uint32_t head() const { return 0; } + uint32_t tail() const { return 0; } + +private: + static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. + static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. + static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. + AData buf; +}; + +class AtomicReqRep +{ +public: + typedef int64_t Data; + typedef std::function<Data(const Data)> Handler; + bool ClientRequest(const Data request, Data &reply) + { + auto end_time = now() + 3s; + do { + Data cur = data_.load(); + if (GetState(cur) == eStateFree && + DataCas(cur, Encode(request, eStateRequest))) { + do { + yield(); + cur = data_.load(); + if (GetState(cur) == eStateReply) { + DataCas(cur, Encode(0, eStateFree)); + reply = Decode(cur); + return true; + } + } while (now() < end_time); + } + yield(); + } while (now() < end_time); + return false; + } + + bool ServerProcess(Handler onReq) + { + Data cur = data_.load(); + switch (GetState(cur)) { + case eStateRequest: + if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) { + timestamp_ = now(); + return true; + } + break; + case eStateReply: + if (timestamp_.load() + 3s < now()) { + DataCas(cur, Encode(0, eStateFree)); + } + break; + case eStateFree: + default: break; + } + return false; + } + +private: + enum State { + eStateFree, + eStateRequest, + eStateReply + }; + static int GetState(Data d) { return d & MaskBits(3); } + static Data Encode(Data d, State st) { return (d << 3) | st; } + static Data Decode(Data d) { return d >> 3; } + static void yield() { QuickSleep(); } + typedef steady_clock::duration Duration; + Duration now() { return steady_clock::now().time_since_epoch(); } + + bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } + std::atomic<Data> data_; + std::atomic<Duration> timestamp_; +}; + } // namespace robust #endif // end of include guard: ROBUST_Q31RCWYU -- Gitblit v1.8.0