From 28f06bc49a4d8d69f1ea2f767863b7921d12f155 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期六, 08 五月 2021 18:30:48 +0800 Subject: [PATCH] add robust FMutex, works fine; use boost circular. --- src/robust.h | 221 ++++++++++++++++++++++-------------------------------- 1 files changed, 91 insertions(+), 130 deletions(-) diff --git a/src/robust.h b/src/robust.h index 3334bc0..d2d94e9 100644 --- a/src/robust.h +++ b/src/robust.h @@ -23,8 +23,12 @@ #include <atomic> #include <chrono> #include <memory> -#include <string.h> +#include <mutex> #include <string> +#include <sys/file.h> +#include <sys/ipc.h> +#include <sys/sem.h> +#include <sys/stat.h> #include <sys/types.h> #include <unistd.h> @@ -37,143 +41,21 @@ void QuickSleep(); -class RobustReqRep -{ - typedef uint32_t State; - typedef std::string Msg; - typedef std::chrono::steady_clock::duration Duration; - -public: - enum ErrorCode { - eSuccess = 0, - eTimeout = EAGAIN, - eSizeError = EINVAL, - }; - - explicit RobustReqRep(const uint32_t max_len) : - capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {} - - void PutReady() { state_.store(eStateReady); } - bool Ready() const { return state_.load() == eStateReady; } - uint32_t capacity() const { return capacity_; } - - int ClientRequest(const Msg &request, Msg &reply) - { - int r = ClientWriteRequest(request); - if (r == eSuccess) { - r = ClientReadReply(reply); - } - return r; - } - int ClientReadReply(Msg &reply); - int ClientWriteRequest(const Msg &request); - int ServerReadRequest(Msg &request); - int ServerWriteReply(const Msg &reply); - -private: - RobustReqRep(const RobustReqRep &); - RobustReqRep(RobustReqRep &&); - RobustReqRep &operator=(const RobustReqRep &) = delete; - RobustReqRep &operator=(RobustReqRep &&) = delete; - - enum { - eStateInit = 0, - eStateReady = 0x19833891, - eClientWriteBegin, - eClientWriteEnd, - eServerReadBegin, - eServerReadEnd, - eServerWriteBegin, - eServerWriteEnd, - eClientReadBegin, - eClientReadEnd = eStateReady, - }; - bool StateCas(State exp, State val); - void Write(const Msg &msg) - { - size_.store(msg.size()); - memcpy(buf, msg.data(), msg.size()); - } - void Read(Msg &msg) { msg.assign(buf, size_.load()); } - - const uint32_t capacity_; - std::atomic<State> state_; - static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data."); - std::atomic<Duration> timestamp_; - std::atomic<int32_t> size_; - char buf[4]; -}; - -class PidLocker -{ -public: - typedef int locker_t; - enum { eLockerBits = sizeof(locker_t) * 8 }; - static locker_t this_locker() - { - static locker_t val = getpid(); - return val; - } - static bool is_alive(locker_t locker) { return true; } -}; - -class RobustPidLocker -{ -public: - typedef int locker_t; - enum { eLockerBits = sizeof(locker_t) * 8 }; - static locker_t this_locker() - { - static locker_t val = getpid(); - return val; - } - static bool is_alive(locker_t locker) - { - char buf[64] = {0}; - snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker); - return access(buf, F_OK) == 0; - } -}; - -class ExpiredLocker -{ -public: - typedef int64_t locker_t; - enum { eLockerBits = 63 }; - static locker_t this_locker() { return Now(); } - static bool is_alive(locker_t locker) - { - return Now() < locker + steady_clock::duration(10s).count(); - } - -private: - static locker_t Now() { return steady_clock::now().time_since_epoch().count(); } -}; - -template <class LockerT> class CasMutex { - typedef typename LockerT::locker_t locker_t; - static inline locker_t this_locker() { return LockerT::this_locker(); } - static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); } - static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits); - static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!"); + typedef uint64_t locker_t; + static inline locker_t this_locker() { return pthread_self(); } + static const uint64_t kLockerMask = MaskBits(63); public: CasMutex() : meta_(0) {} int try_lock() { - const auto t = steady_clock::now().time_since_epoch().count(); auto old = meta_.load(); int r = 0; if (!Locked(old)) { r = MetaCas(old, Meta(1, this_locker())); - } else if (!is_alive(Locker(old))) { - r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1; - if (r) { - LOG_DEBUG() << "captured locker " << int64_t(Locker(old)) << " -> " << int64_t(this_locker()) << ", locker = " << r; - } } return r; } @@ -201,7 +83,89 @@ bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } }; -typedef CasMutex<RobustPidLocker> Mutex; +class NullMutex +{ +public: + bool try_lock() { return true; } + void lock() {} + void unlock() {} +}; + +// flock + mutex +class FMutex +{ +public: + typedef uint64_t id_t; + FMutex(id_t id) : + id_(id), fd_(Open(id_)) + { + if (fd_ == -1) { throw "error create mutex!"; } + } + ~FMutex() { Close(fd_); } + bool try_lock(); + void lock(); + void unlock(); + +private: + static std::string GetPath(id_t id) + { + const std::string dir("/tmp/.bhome_mtx"); + mkdir(dir.c_str(), 0777); + return dir + "/fm_" + std::to_string(id); + } + static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDWR, 0666); } + static int Close(int fd) { return close(fd); } + id_t id_; + int fd_; + std::mutex mtx_; +}; + +union semun { + int val; /* Value for SETVAL */ + struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ + unsigned short *array; /* Array for GETALL, SETALL */ + struct seminfo *__buf; /* Buffer for IPC_INFO + (Linux-specific) */ +}; + +class SemMutex +{ +public: + SemMutex(key_t key) : + key_(key), sem_id_(semget(key, 1, 0666 | IPC_CREAT)) + { + if (sem_id_ == -1) { throw "error create semaphore."; } + union semun init_val; + init_val.val = 1; + semctl(sem_id_, 0, SETVAL, init_val); + } + ~SemMutex() + { + // semctl(sem_id_, 0, IPC_RMID, semun{}); + } + + bool try_lock() + { + sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT}; + return semop(sem_id_, &op, 1) == 0; + } + + void lock() + { + sembuf op = {0, -1, SEM_UNDO}; + semop(sem_id_, &op, 1); + } + + void unlock() + { + sembuf op = {0, 1, SEM_UNDO}; + semop(sem_id_, &op, 1); + } + +private: + key_t key_; + int sem_id_; +}; template <class Lock> class Guard @@ -245,7 +209,6 @@ bool push_back(const Data d) { - Guard<Mutex> guard(mutex_); auto old = mtail(); auto pos = Pos(old); auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0); @@ -257,7 +220,6 @@ } bool pop_front(Data &d) { - Guard<Mutex> guard(mutex_); auto old = mhead(); auto pos = Pos(old); if (!(pos == tail())) { @@ -281,7 +243,6 @@ meta_type mtail() const { return mtail_.load(); } // data const size_type capacity_; - Mutex mutex_; std::atomic<meta_type> mhead_; std::atomic<meta_type> mtail_; Alloc al_; -- Gitblit v1.8.0