From 77a6c3512a44dfe6540dde71946e6484fe4f173f Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 10 五月 2021 16:05:28 +0800 Subject: [PATCH] test lock code. --- src/robust.h | 224 +++++++++++++++++++++++--------------------------------- 1 files changed, 92 insertions(+), 132 deletions(-) diff --git a/src/robust.h b/src/robust.h index aa54c2f..8657122 100644 --- a/src/robust.h +++ b/src/robust.h @@ -19,11 +19,16 @@ #ifndef ROBUST_Q31RCWYU #define ROBUST_Q31RCWYU +#include "log.h" #include <atomic> #include <chrono> #include <memory> -#include <string.h> +#include <mutex> #include <string> +#include <sys/file.h> +#include <sys/ipc.h> +#include <sys/sem.h> +#include <sys/stat.h> #include <sys/types.h> #include <unistd.h> @@ -36,143 +41,21 @@ void QuickSleep(); -class RobustReqRep -{ - typedef uint32_t State; - typedef std::string Msg; - typedef std::chrono::steady_clock::duration Duration; - -public: - enum ErrorCode { - eSuccess = 0, - eTimeout = EAGAIN, - eSizeError = EINVAL, - }; - - explicit RobustReqRep(const uint32_t max_len) : - capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {} - - void PutReady() { state_.store(eStateReady); } - bool Ready() const { return state_.load() == eStateReady; } - uint32_t capacity() const { return capacity_; } - - int ClientRequest(const Msg &request, Msg &reply) - { - int r = ClientWriteRequest(request); - if (r == eSuccess) { - r = ClientReadReply(reply); - } - return r; - } - int ClientReadReply(Msg &reply); - int ClientWriteRequest(const Msg &request); - int ServerReadRequest(Msg &request); - int ServerWriteReply(const Msg &reply); - -private: - RobustReqRep(const RobustReqRep &); - RobustReqRep(RobustReqRep &&); - RobustReqRep &operator=(const RobustReqRep &) = delete; - RobustReqRep &operator=(RobustReqRep &&) = delete; - - enum { - eStateInit = 0, - eStateReady = 0x19833891, - eClientWriteBegin, - eClientWriteEnd, - eServerReadBegin, - eServerReadEnd, - eServerWriteBegin, - eServerWriteEnd, - eClientReadBegin, - eClientReadEnd = eStateReady, - }; - bool StateCas(State exp, State val); - void Write(const Msg &msg) - { - size_.store(msg.size()); - memcpy(buf, msg.data(), msg.size()); - } - void Read(Msg &msg) { msg.assign(buf, size_.load()); } - - const uint32_t capacity_; - std::atomic<State> state_; - static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data."); - std::atomic<Duration> timestamp_; - std::atomic<int32_t> size_; - char buf[4]; -}; - -class PidLocker -{ -public: - typedef int locker_t; - enum { eLockerBits = sizeof(locker_t) * 8 }; - static locker_t this_locker() - { - static locker_t val = getpid(); - return val; - } - static bool is_alive(locker_t locker) { return true; } -}; - -class RobustPidLocker -{ -public: - typedef int locker_t; - enum { eLockerBits = sizeof(locker_t) * 8 }; - static locker_t this_locker() - { - static locker_t val = getpid(); - return val; - } - static bool is_alive(locker_t locker) - { - char buf[64] = {0}; - snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker); - return access(buf, F_OK) == 0; - } -}; - -class ExpiredLocker -{ -public: - typedef int64_t locker_t; - enum { eLockerBits = 63 }; - static locker_t this_locker() { return Now(); } - static bool is_alive(locker_t locker) - { - return Now() < locker + steady_clock::duration(10s).count(); - } - -private: - static locker_t Now() { return steady_clock::now().time_since_epoch().count(); } -}; - -template <class LockerT> class CasMutex { - typedef typename LockerT::locker_t locker_t; - static inline locker_t this_locker() { return LockerT::this_locker(); } - static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); } - static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits); - static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!"); + typedef uint64_t locker_t; + static inline locker_t this_locker() { return pthread_self(); } + static const uint64_t kLockerMask = MaskBits(63); public: CasMutex() : meta_(0) {} int try_lock() { - const auto t = steady_clock::now().time_since_epoch().count(); auto old = meta_.load(); int r = 0; if (!Locked(old)) { r = MetaCas(old, Meta(1, this_locker())); - } else if (!is_alive(Locker(old))) { - r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1; - if (r) { - printf("captured locker %ld -> %ld, locker = %d\n", int64_t(Locker(old)), int64_t(this_locker()), r); - } } return r; } @@ -200,7 +83,87 @@ bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } }; -typedef CasMutex<RobustPidLocker> Mutex; +class NullMutex +{ +public: + template <class... T> + explicit NullMutex(T &&...t) {} // easy test. + bool try_lock() { return true; } + void lock() {} + void unlock() {} +}; + +// flock + mutex +class FMutex +{ +public: + typedef uint64_t id_t; + FMutex(id_t id) : + id_(id), fd_(Open(id_)) + { + if (fd_ == -1) { throw "error create mutex!"; } + } + ~FMutex() { Close(fd_); } + bool try_lock(); + void lock(); + void unlock(); + +private: + static std::string GetPath(id_t id) + { + const std::string dir("/tmp/.bhome_mtx"); + mkdir(dir.c_str(), 0777); + return dir + "/fm_" + std::to_string(id); + } + static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); } + static int Close(int fd) { return close(fd); } + void FLock(); + void FUnlock(); + id_t id_; + int fd_; + std::mutex mtx_; +}; + +union semun { + int val; /* Value for SETVAL */ + struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ + unsigned short *array; /* Array for GETALL, SETALL */ + struct seminfo *__buf; /* Buffer for IPC_INFO + (Linux-specific) */ +}; + +class SemMutex +{ +public: + SemMutex(key_t key) : + key_(key), sem_id_(semget(key, 1, 0666)) + { + if (sem_id_ == -1) { throw "error create semaphore."; } + } + ~SemMutex() {} + + bool try_lock() + { + sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT}; + return semop(sem_id_, &op, 1) == 0; + } + + void lock() + { + sembuf op = {0, -1, SEM_UNDO}; + semop(sem_id_, &op, 1); + } + + void unlock() + { + sembuf op = {0, 1, SEM_UNDO}; + semop(sem_id_, &op, 1); + } + +private: + key_t key_; + int sem_id_; +}; template <class Lock> class Guard @@ -244,7 +207,6 @@ bool push_back(const Data d) { - Guard<Mutex> guard(mutex_); auto old = mtail(); auto pos = Pos(old); auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0); @@ -256,7 +218,6 @@ } bool pop_front(Data &d) { - Guard<Mutex> guard(mutex_); auto old = mhead(); auto pos = Pos(old); if (!(pos == tail())) { @@ -280,7 +241,6 @@ meta_type mtail() const { return mtail_.load(); } // data const size_type capacity_; - Mutex mutex_; std::atomic<meta_type> mhead_; std::atomic<meta_type> mtail_; Alloc al_; @@ -306,7 +266,7 @@ size_type tail() const { return tail_.load(); } bool like_empty() const { return head() == tail() && Empty(buf[head()]); } bool like_full() const { return head() == tail() && !Empty(buf[head()]); } - bool push_back(const Data d, bool try_more = false) + bool push(const Data d, bool try_more = false) { bool r = false; size_type i = 0; @@ -319,7 +279,7 @@ } while (try_more && !r && ++i < capacity); return r; } - bool pop_front(Data &d, bool try_more = false) + bool pop(Data &d, bool try_more = false) { bool r = false; Data cur; -- Gitblit v1.8.0