use new robust mutex, circurar; rm timeout mutex.
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: robust.cpp |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月27日 10时04分19秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "robust.h" |
| | | #include <chrono> |
| | | #include <thread> |
| | | |
| | | namespace robust |
| | | { |
| | | |
| | | namespace |
| | | { |
| | | static_assert(sizeof(steady_clock::duration) == sizeof(int64_t)); |
| | | static_assert(sizeof(RobustReqRep) == 24); |
| | | static_assert(sizeof(CasMutex<false>) == 8); |
| | | static_assert(sizeof(CircularBuffer<int>) == 48); |
| | | |
| | | auto Now() { return steady_clock::now().time_since_epoch(); } |
| | | const steady_clock::duration kIoTimeout = 10ms; |
| | | const steady_clock::duration kIoExpire = 100ms; |
| | | |
| | | void Yield() { std::this_thread::sleep_for(10us); } |
| | | } // namespace |
| | | |
| | | void QuickSleep() |
| | | { |
| | | Yield(); |
| | | } |
| | | bool RobustReqRep::StateCas(State exp, State val) |
| | | { |
| | | bool r = state_.compare_exchange_strong(exp, val); |
| | | return r ? (timestamp_.store(Now()), true) : false; |
| | | } |
| | | |
| | | int RobustReqRep::ClientReadReply(Msg &reply) |
| | | { |
| | | auto end_time = Now() + kIoTimeout; |
| | | int done = false; |
| | | do { |
| | | if (StateCas(eServerWriteEnd, eClientReadBegin)) { |
| | | Read(reply); |
| | | done = StateCas(eClientReadBegin, eClientReadEnd); |
| | | if (done) { break; } |
| | | } |
| | | Yield(); |
| | | } while (Now() < end_time); |
| | | return done ? eSuccess : eTimeout; |
| | | } |
| | | |
| | | int RobustReqRep::ClientWriteRequest(const Msg &request) |
| | | { |
| | | if (request.size() > capacity_) { |
| | | return eSizeError; |
| | | } |
| | | auto end_time = Now() + kIoTimeout; |
| | | bool done = false; |
| | | do { |
| | | if (StateCas(eStateReady, eClientWriteBegin)) { |
| | | Write(request); |
| | | done = StateCas(eClientWriteBegin, eClientWriteEnd); |
| | | if (done) { break; } |
| | | } |
| | | Yield(); |
| | | } while (Now() < end_time); |
| | | return done ? eSuccess : eTimeout; |
| | | } |
| | | |
| | | int RobustReqRep::ServerReadRequest(Msg &request) |
| | | { |
| | | bool done = false; |
| | | if (StateCas(eClientWriteEnd, eServerReadBegin)) { |
| | | Read(request); |
| | | done = StateCas(eServerReadBegin, eServerReadEnd); |
| | | } else { |
| | | auto old = state_.load(); |
| | | if (old != eStateReady && timestamp_.load() + kIoExpire < Now()) { |
| | | StateCas(old, eStateReady); |
| | | } |
| | | } |
| | | return done ? eSuccess : eTimeout; |
| | | } |
| | | |
| | | int RobustReqRep::ServerWriteReply(const Msg &reply) |
| | | { |
| | | if (reply.size() > capacity_) { |
| | | return eSizeError; |
| | | } |
| | | // no need to loop write, either success or timeout. |
| | | bool done = false; |
| | | if (StateCas(eServerReadEnd, eServerWriteBegin)) { |
| | | Write(reply); |
| | | done = StateCas(eServerWriteBegin, eServerWriteEnd); |
| | | } |
| | | return done ? eSuccess : eTimeout; |
| | | } |
| | | |
| | | } // namespace robust |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: robust.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月27日 10时04分29秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | |
| | | #ifndef ROBUST_Q31RCWYU |
| | | #define ROBUST_Q31RCWYU |
| | | |
| | | #include <atomic> |
| | | #include <chrono> |
| | | #include <memory> |
| | | #include <string.h> |
| | | #include <string> |
| | | #include <sys/types.h> |
| | | #include <unistd.h> |
| | | |
| | | namespace robust |
| | | { |
| | | |
| | | using namespace std::chrono; |
| | | using namespace std::chrono_literals; |
| | | |
| | | void QuickSleep(); |
| | | |
| | | class RobustReqRep |
| | | { |
| | | typedef uint32_t State; |
| | | typedef std::string Msg; |
| | | typedef std::chrono::steady_clock::duration Duration; |
| | | |
| | | public: |
| | | enum ErrorCode { |
| | | eSuccess = 0, |
| | | eTimeout = EAGAIN, |
| | | eSizeError = EINVAL, |
| | | }; |
| | | |
| | | explicit RobustReqRep(const uint32_t max_len) : |
| | | capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {} |
| | | |
| | | void PutReady() { state_.store(eStateReady); } |
| | | bool Ready() const { return state_.load() == eStateReady; } |
| | | uint32_t capacity() const { return capacity_; } |
| | | |
| | | int ClientRequest(const Msg &request, Msg &reply) |
| | | { |
| | | int r = ClientWriteRequest(request); |
| | | if (r == eSuccess) { |
| | | r = ClientReadReply(reply); |
| | | } |
| | | return r; |
| | | } |
| | | int ClientReadReply(Msg &reply); |
| | | int ClientWriteRequest(const Msg &request); |
| | | int ServerReadRequest(Msg &request); |
| | | int ServerWriteReply(const Msg &reply); |
| | | |
| | | private: |
| | | RobustReqRep(const RobustReqRep &); |
| | | RobustReqRep(RobustReqRep &&); |
| | | RobustReqRep &operator=(const RobustReqRep &) = delete; |
| | | RobustReqRep &operator=(RobustReqRep &&) = delete; |
| | | |
| | | enum { |
| | | eStateInit = 0, |
| | | eStateReady = 0x19833891, |
| | | eClientWriteBegin, |
| | | eClientWriteEnd, |
| | | eServerReadBegin, |
| | | eServerReadEnd, |
| | | eServerWriteBegin, |
| | | eServerWriteEnd, |
| | | eClientReadBegin, |
| | | eClientReadEnd = eStateReady, |
| | | }; |
| | | bool StateCas(State exp, State val); |
| | | void Write(const Msg &msg) |
| | | { |
| | | size_.store(msg.size()); |
| | | memcpy(buf, msg.data(), msg.size()); |
| | | } |
| | | void Read(Msg &msg) { msg.assign(buf, size_.load()); } |
| | | |
| | | const uint32_t capacity_; |
| | | std::atomic<State> state_; |
| | | static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data."); |
| | | std::atomic<Duration> timestamp_; |
| | | std::atomic<int32_t> size_; |
| | | char buf[4]; |
| | | }; |
| | | |
| | | template <bool isRobust = false> |
| | | class CasMutex |
| | | { |
| | | static pid_t pid() |
| | | { |
| | | static pid_t val = getpid(); |
| | | return val; |
| | | } |
| | | static bool Killed(pid_t pid) |
| | | { |
| | | char buf[64] = {0}; |
| | | snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid); |
| | | return access(buf, F_OK) != 0; |
| | | } |
| | | |
| | | public: |
| | | CasMutex() : |
| | | meta_(0) {} |
| | | int try_lock() |
| | | { |
| | | const auto t = steady_clock::now().time_since_epoch().count(); |
| | | auto old = meta_.load(); |
| | | int r = 0; |
| | | if (!Locked(old)) { |
| | | r = MetaCas(old, Meta(1, pid())); |
| | | } else if (isRobust && Killed(Pid(old))) { |
| | | r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1; |
| | | if (r) { |
| | | printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r); |
| | | } |
| | | } |
| | | return r; |
| | | } |
| | | int lock() |
| | | { |
| | | int r = 0; |
| | | do { |
| | | r = try_lock(); |
| | | } while (r == 0); |
| | | return r; |
| | | } |
| | | void unlock() |
| | | { |
| | | auto old = meta_.load(); |
| | | if (Locked(old) && Pid(old) == pid()) { |
| | | MetaCas(old, Meta(0, pid())); |
| | | } |
| | | } |
| | | |
| | | private: |
| | | std::atomic<uint64_t> meta_; |
| | | bool Locked(uint64_t meta) { return (meta >> 63) != 0; } |
| | | pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); } |
| | | uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; } |
| | | bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } |
| | | static_assert(sizeof(pid_t) < sizeof(uint64_t)); |
| | | }; |
| | | |
| | | template <class Lock> |
| | | class Guard |
| | | { |
| | | public: |
| | | Guard(Lock &l) : |
| | | l_(l) { l_.lock(); } |
| | | ~Guard() { l_.unlock(); } |
| | | |
| | | private: |
| | | Guard(const Guard &); |
| | | Guard(Guard &&); |
| | | Lock &l_; |
| | | }; |
| | | |
| | | template <class D, class Alloc = std::allocator<D>> |
| | | class CircularBuffer |
| | | { |
| | | typedef uint32_t size_type; |
| | | typedef uint32_t count_type; |
| | | typedef uint64_t meta_type; |
| | | static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; } |
| | | static count_type Count(meta_type meta) { return meta >> 32; } |
| | | static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } |
| | | |
| | | public: |
| | | typedef D Data; |
| | | |
| | | CircularBuffer(const size_type cap) : |
| | | CircularBuffer(cap, Alloc()) {} |
| | | CircularBuffer(const size_type cap, Alloc const &al) : |
| | | state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap)) |
| | | { |
| | | if (!buf) { |
| | | throw("error allocate buffer: out of mem!"); |
| | | } |
| | | } |
| | | ~CircularBuffer() |
| | | { |
| | | al_.deallocate(buf, capacity_); |
| | | } |
| | | size_type size() const { return (capacity_ + tail() - head()) % capacity_; } |
| | | bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; } |
| | | bool empty() const { return head() == tail(); } |
| | | bool push_back(Data d) |
| | | { |
| | | Guard<MutexT> guard(mutex_); |
| | | if (!full()) { |
| | | auto old = mtail(); |
| | | buf[Pos(old)] = d; |
| | | return mtail_.compare_exchange_strong(old, next(old)); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | bool pop_front(Data &d) |
| | | { |
| | | Guard<MutexT> guard(mutex_); |
| | | if (!empty()) { |
| | | auto old = mhead(); |
| | | d = buf[Pos(old)]; |
| | | return mhead_.compare_exchange_strong(old, next(old)); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | bool Ready() const { return state_.load() == eStateReady; } |
| | | void PutReady() { state_.store(eStateReady); } |
| | | |
| | | private: |
| | | CircularBuffer(const CircularBuffer &); |
| | | CircularBuffer(CircularBuffer &&); |
| | | CircularBuffer &operator=(const CircularBuffer &) = delete; |
| | | CircularBuffer &operator=(CircularBuffer &&) = delete; |
| | | typedef CasMutex<true> MutexT; |
| | | // static_assert(sizeof(MutexT) == 16); |
| | | meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } |
| | | size_type head() const { return Pos(mhead()); } |
| | | size_type tail() const { return Pos(mtail()); } |
| | | meta_type mhead() const { return mhead_.load(); } |
| | | meta_type mtail() const { return mtail_.load(); } |
| | | // data |
| | | enum { eStateReady = 0x19833891 }; |
| | | std::atomic<uint32_t> state_; |
| | | const size_type capacity_; |
| | | MutexT mutex_; |
| | | std::atomic<meta_type> mhead_; |
| | | std::atomic<meta_type> mtail_; |
| | | Alloc al_; |
| | | typename Alloc::pointer buf = nullptr; |
| | | }; |
| | | |
| | | } // namespace robust |
| | | #endif // end of include guard: ROBUST_Q31RCWYU |
| | |
| | | namespace bhome_shm |
| | | { |
| | | |
| | | bool MutexWithTimeLimit::try_lock() |
| | | { |
| | | if (mutex_.try_lock()) { |
| | | auto old_time = last_lock_time_.load(); |
| | | if (Now() - old_time > limit_) { |
| | | return last_lock_time_.compare_exchange_strong(old_time, Now()); |
| | | } else { |
| | | last_lock_time_.store(Now()); |
| | | return true; |
| | | } |
| | | } else { |
| | | auto old_time = last_lock_time_.load(); |
| | | if (Now() - old_time > limit_) { |
| | | return last_lock_time_.compare_exchange_strong(old_time, Now()); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | } |
| | | void MutexWithTimeLimit::lock() |
| | | { |
| | | while (!try_lock()) { |
| | | std::this_thread::yield(); |
| | | } |
| | | } |
| | | void MutexWithTimeLimit::unlock() |
| | | { |
| | | auto old_time = last_lock_time_.load(); |
| | | if (Now() - old_time > limit_) { |
| | | } else { |
| | | if (last_lock_time_.compare_exchange_strong(old_time, Now())) { |
| | | mutex_.unlock(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | SharedMemory::SharedMemory(const std::string &name, const uint64_t size) : |
| | | mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()), |
| | | name_(name) |
| | |
| | | #ifndef SHM_6CHO6D6C |
| | | #define SHM_6CHO6D6C |
| | | |
| | | #include "robust.h" |
| | | #include <atomic> |
| | | #include <boost/interprocess/managed_shared_memory.hpp> |
| | | #include <boost/interprocess/sync/interprocess_condition.hpp> |
| | | #include <boost/interprocess/sync/interprocess_mutex.hpp> |
| | | #include <boost/interprocess/sync/scoped_lock.hpp> |
| | | #include <boost/noncopyable.hpp> |
| | | #include <chrono> |
| | | #include <thread> |
| | | |
| | | namespace bhome_shm |
| | |
| | | |
| | | typedef managed_shared_memory mshm_t; |
| | | |
| | | class CasMutex |
| | | { |
| | | std::atomic<bool> flag_; |
| | | bool cas(bool expected, bool new_val) { return flag_.compare_exchange_strong(expected, new_val); } |
| | | |
| | | public: |
| | | CasMutex() : |
| | | flag_(false) {} |
| | | bool try_lock() { return cas(false, true); } |
| | | void lock() |
| | | { |
| | | while (!try_lock()) { std::this_thread::yield(); } |
| | | } |
| | | void unlock() { cas(true, false); } |
| | | }; |
| | | |
| | | class MutexWithTimeLimit |
| | | class MutexWithPidCheck |
| | | { |
| | | typedef boost::interprocess::interprocess_mutex MutexT; |
| | | // typedef CasMutex MutexT; |
| | | typedef std::chrono::steady_clock Clock; |
| | | typedef Clock::duration Duration; |
| | | static Duration Now() { return Clock::now().time_since_epoch(); } |
| | | |
| | | const Duration limit_; |
| | | std::atomic<Duration> last_lock_time_; |
| | | static pid_t pid() |
| | | { |
| | | static pid_t val = getpid(); |
| | | return val; |
| | | } |
| | | static bool Killed(pid_t pid) |
| | | { |
| | | char buf[64] = {0}; |
| | | snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid); |
| | | return access(buf, F_OK) != 0; |
| | | } |
| | | bool PidCas(pid_t exp, pid_t val) { return pid_.compare_exchange_strong(exp, val); } |
| | | MutexT mutex_; |
| | | std::atomic<pid_t> pid_; |
| | | |
| | | public: |
| | | typedef MutexT::internal_mutex_type internal_mutex_type; |
| | | const internal_mutex_type &internal_mutex() const { return mutex_.internal_mutex(); } |
| | | internal_mutex_type &internal_mutex() { return mutex_.internal_mutex(); } |
| | | MutexWithPidCheck() : |
| | | pid_(0) {} |
| | | bool try_lock() |
| | | { |
| | | bool r = false; |
| | | if (mutex_.try_lock()) { |
| | | auto old = pid_.load(); |
| | | r = PidCas(old, pid()); |
| | | } else { |
| | | auto old = pid_.load(); |
| | | if (Killed(old)) { |
| | | r = PidCas(old, pid()); |
| | | if (r) { |
| | | printf("PidCheck captured pid %d -> %d\n", old, pid()); |
| | | } |
| | | } |
| | | } |
| | | return r; |
| | | } |
| | | |
| | | explicit MutexWithTimeLimit(Duration limit) : |
| | | limit_(limit) {} |
| | | MutexWithTimeLimit() : |
| | | MutexWithTimeLimit(std::chrono::seconds(1)) {} |
| | | ~MutexWithTimeLimit() { static_assert(std::is_pod<Duration>::value); } |
| | | bool try_lock(); |
| | | void lock(); |
| | | void unlock(); |
| | | void lock() |
| | | { |
| | | while (!try_lock()) { |
| | | std::this_thread::yield(); |
| | | } |
| | | } |
| | | void unlock() |
| | | { |
| | | auto old = pid_.load(); |
| | | if (old == pid()) { |
| | | mutex_.unlock(); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | // typedef boost::interprocess::interprocess_mutex Mutex; |
| | | typedef MutexWithTimeLimit Mutex; |
| | | typedef scoped_lock<Mutex> Guard; |
| | | typedef interprocess_condition Cond; |
| | | typedef robust::CasMutex<true> Mutex; |
| | | typedef robust::Guard<Mutex> Guard; |
| | | |
| | | class SharedMemory : public mshm_t |
| | | { |
| | |
| | | |
| | | #include "shm.h" |
| | | #include <atomic> |
| | | #include <boost/circular_buffer.hpp> |
| | | #include <boost/date_time/posix_time/posix_time.hpp> |
| | | #include <chrono> |
| | | |
| | | namespace bhome_shm |
| | | { |
| | | |
| | | template <class D> |
| | | using Circular = boost::circular_buffer<D, Allocator<D>>; |
| | | using Circular = robust::CircularBuffer<D, Allocator<D>>; |
| | | |
| | | |
| | | template <class D> |
| | | class SharedQueue : private Circular<D> |
| | | class SharedQueue |
| | | { |
| | | typedef Circular<D> Super; |
| | | Mutex mutex_; |
| | | Cond cond_read_; |
| | | Cond cond_write_; |
| | | Mutex &mutex() { return mutex_; } |
| | | public: |
| | | SharedQueue(const uint32_t len, Allocator<D> const &alloc) : |
| | | queue_(len, alloc) {} |
| | | |
| | | static boost::posix_time::ptime MSFromNow(const int ms) |
| | | template <class OnWrite> |
| | | bool TryWrite(const D &d, const OnWrite &onWrite) |
| | | { |
| | | using namespace boost::posix_time; |
| | | ptime cur = boost::posix_time::microsec_clock::universal_time(); |
| | | return cur + millisec(ms); |
| | | } |
| | | |
| | | auto TimedReadPred(const int timeout_ms) |
| | | { |
| | | auto endtime = MSFromNow(timeout_ms); |
| | | return [this, endtime](Guard &lock) { |
| | | return (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })); |
| | | }; |
| | | } |
| | | auto TryReadPred() |
| | | { |
| | | return [this](Guard &lock) { return !this->empty(); }; |
| | | } |
| | | |
| | | template <class Pred> |
| | | bool ReadOnCond(D &buf, Pred const &pred) |
| | | { |
| | | auto Read = [&]() { |
| | | Guard lock(this->mutex()); |
| | | if (pred(lock)) { |
| | | using std::swap; |
| | | swap(buf, Super::front()); |
| | | Super::pop_front(); |
| | | Guard lock(mutex()); |
| | | if (!queue_.full()) { |
| | | onWrite(d); |
| | | queue_.push_back(d); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | }; |
| | | return Read() ? (this->cond_write_.notify_one(), true) : false; |
| | | } |
| | | |
| | | template <class Iter, class Pred, class OnWrite> |
| | | int WriteAllOnCond(Iter begin, Iter end, Pred const &pred, OnWrite const &onWrite) |
| | | bool TryWrite(const D &d) |
| | | { |
| | | if (begin == end) { return 0; } |
| | | |
| | | int n = 0; |
| | | Guard lock(mutex()); |
| | | while (pred(lock)) { |
| | | onWrite(*begin); |
| | | Super::push_back(*begin); |
| | | ++n; |
| | | cond_read_.notify_one(); |
| | | if (++begin == end) { |
| | | break; |
| | | } |
| | | } |
| | | return n; |
| | | return !queue_.full() ? (queue_.push_back(d), true) : false; |
| | | } |
| | | |
| | | public: |
| | | SharedQueue(const uint32_t len, Allocator<D> const &alloc) : |
| | | Super(len, alloc) {} |
| | | |
| | | template <class Iter, class OnWrite> |
| | | int TryWrite(Iter begin, Iter end, const OnWrite &onWrite) |
| | | bool Read(D &d, const int timeout_ms) |
| | | { |
| | | auto tryWritePred = [this](Guard &lock) { return !this->full(); }; |
| | | return WriteAllOnCond(begin, end, tryWritePred, onWrite); |
| | | using namespace std::chrono; |
| | | auto end_time = steady_clock::now() + milliseconds(timeout_ms); |
| | | do { |
| | | if (TryRead(d)) { |
| | | return true; |
| | | } else { |
| | | robust::QuickSleep(); |
| | | } |
| | | |
| | | template <class OnWrite> |
| | | bool TryWrite(const D &buf, const OnWrite &onWrite) { return TryWrite(&buf, (&buf) + 1, onWrite); } |
| | | |
| | | bool TryWrite(const D &buf) |
| | | } while (steady_clock::now() < end_time); |
| | | return false; |
| | | } |
| | | bool TryRead(D &d) |
| | | { |
| | | return TryWrite(buf, [](const D &buf) {}); |
| | | Guard lock(mutex()); |
| | | if (!queue_.empty()) { |
| | | queue_.pop_front(d); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | template <class OnData> |
| | | int ReadAll(const int timeout_ms, OnData const &onData) { return ReadAllOnCond(TimedReadPred(timeout_ms), onData); } |
| | | template <class OnData> |
| | | int TryReadAll(OnData const &onData) { return ReadAllOnCond(TryReadPred(), onData); } |
| | | |
| | | bool Read(D &buf, const int timeout_ms) { return ReadOnCond(buf, TimedReadPred(timeout_ms)); } |
| | | bool TryRead(D &buf) { return ReadOnCond(buf, TryReadPred()); } |
| | | private: |
| | | typedef Circular<D> Queue; |
| | | Queue queue_; |
| | | Mutex mutex_; |
| | | Mutex &mutex() { return mutex_; } |
| | | }; |
| | | |
| | | } // namespace bhome_shm |
| | |
| | | * ===================================================================================== |
| | | */ |
| | | #include "bh_api.h" |
| | | #include "robust.h" |
| | | #include "util.h" |
| | | #include <atomic> |
| | | #include <boost/lockfree/queue.hpp> |
| | |
| | | // printf("client Recv reply : %s\n", reply.data().c_str()); |
| | | } |
| | | |
| | | class TLMutex |
| | | { |
| | | typedef boost::interprocess::interprocess_mutex MutexT; |
| | | // typedef CasMutex MutexT; |
| | | // typedef std::mutex MutexT; |
| | | typedef std::chrono::steady_clock Clock; |
| | | typedef Clock::duration Duration; |
| | | static Duration Now() { return Clock::now().time_since_epoch(); } |
| | | |
| | | const Duration limit_; |
| | | std::atomic<Duration> last_lock_time_; |
| | | MutexT mutex_; |
| | | bool Expired(const Duration diff) { return diff > limit_; } |
| | | |
| | | public: |
| | | struct Status { |
| | | int64_t nlock_ = 0; |
| | | int64_t nupdate_time_fail = 0; |
| | | int64_t nfail = 0; |
| | | int64_t nexcept = 0; |
| | | }; |
| | | Status st_; |
| | | |
| | | explicit TLMutex(Duration limit) : |
| | | limit_(limit) {} |
| | | TLMutex() : |
| | | TLMutex(std::chrono::seconds(1)) {} |
| | | ~TLMutex() { static_assert(std::is_pod<Duration>::value); } |
| | | bool try_lock() |
| | | { |
| | | if (mutex_.try_lock()) { |
| | | auto old_time = last_lock_time_.load(); |
| | | auto cur = Now(); |
| | | if (Expired(cur - old_time)) { |
| | | return last_lock_time_.compare_exchange_strong(old_time, cur); |
| | | } else { |
| | | last_lock_time_.store(Now()); |
| | | return true; |
| | | } |
| | | } else { |
| | | auto old_time = last_lock_time_.load(); |
| | | auto cur = Now(); |
| | | if (Expired(cur - old_time)) { |
| | | return last_lock_time_.compare_exchange_strong(old_time, cur); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | } |
| | | void lock() |
| | | { |
| | | int n = 0; |
| | | while (!try_lock()) { |
| | | n++; |
| | | std::this_thread::yield(); |
| | | } |
| | | st_.nlock_ += n; |
| | | } |
| | | void unlock() |
| | | { |
| | | auto old_time = last_lock_time_.load(); |
| | | auto cur = Now(); |
| | | if (!Expired(cur - old_time)) { |
| | | if (last_lock_time_.compare_exchange_strong(old_time, cur)) { |
| | | mutex_.unlock(); |
| | | } |
| | | } |
| | | } |
| | | }; |
| | | |
| | | //robust attr does NOT work, maybe os does not support it. |
| | | class RobustMutex |
| | | { |
| | | public: |
| | | RobustMutex() |
| | | { |
| | | pthread_mutexattr_t mutex_attr; |
| | | auto attr = [&]() { return &mutex_attr; }; |
| | | int r = pthread_mutexattr_init(attr()); |
| | | r |= pthread_mutexattr_setpshared(attr(), PTHREAD_PROCESS_SHARED); |
| | | r |= pthread_mutexattr_setrobust_np(attr(), PTHREAD_MUTEX_ROBUST_NP); |
| | | r |= pthread_mutex_init(mtx(), attr()); |
| | | int rob = 0; |
| | | pthread_mutexattr_getrobust_np(attr(), &rob); |
| | | int shared = 0; |
| | | pthread_mutexattr_getpshared(attr(), &shared); |
| | | printf("robust : %d, shared : %d\n", rob, shared); |
| | | r |= pthread_mutexattr_destroy(attr()); |
| | | if (r) { |
| | | throw("init mutex error."); |
| | | } |
| | | } |
| | | ~RobustMutex() |
| | | { |
| | | pthread_mutex_destroy(mtx()); |
| | | } |
| | | |
| | | public: |
| | | void lock() { Lock(); } |
| | | bool try_lock() |
| | | { |
| | | int r = TryLock(); |
| | | printf("TryLock ret: %d\n", r); |
| | | return r == 0; |
| | | } |
| | | |
| | | void unlock() { Unlock(); } |
| | | |
| | | // private: |
| | | int TryLock() { return pthread_mutex_trylock(mtx()); } |
| | | int Lock() { return pthread_mutex_lock(mtx()); } |
| | | int Unlock() { return pthread_mutex_unlock(mtx()); } |
| | | |
| | | private: |
| | | pthread_mutex_t *mtx() { return &mutex_; } |
| | | pthread_mutex_t mutex_; |
| | | }; |
| | | |
| | | class LockFreeQueue |
| | | { |
| | | typedef int64_t Data; |
| | | typedef boost::lockfree::queue<Data, boost::lockfree::capacity<1024>> LFQueue; |
| | | void push_back(Data d) { queue_.push(d); } |
| | | |
| | | private: |
| | | LFQueue queue_; |
| | | }; |
| | | |
| | | } // namespace |
| | | #include <chrono> |
| | | using namespace std::chrono; |
| | | // using namespace std::chrono_literals; |
| | | |
| | | BOOST_AUTO_TEST_CASE(MutexTest) |
| | | { |
| | | // typedef robust::CasMutex<true> RobustMutex; |
| | | typedef MutexWithPidCheck RobustMutex; |
| | | |
| | | for (int i = 0; i < 20; ++i) { |
| | | int size = i; |
| | | int left = size & 7; |
| | | int rsize = size + ((8 - left) & 7); |
| | | printf("size: %3d, rsize: %3d\n", size, rsize); |
| | | } |
| | | SharedMemory &shm = TestShm(); |
| | | // shm.Remove(); |
| | | // return; |
| | |
| | | |
| | | const std::string mtx_name("test_mutex"); |
| | | const std::string int_name("test_int"); |
| | | auto mtx = shm.FindOrCreate<TLMutex>(mtx_name); |
| | | auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name); |
| | | auto pi = shm.FindOrCreate<int>(int_name, 100); |
| | | |
| | | std::mutex m; |
| | |
| | | |
| | | { |
| | | boost::timer::auto_cpu_timer timer; |
| | | printf("test time: "); |
| | | TLMutex mutex; |
| | | // CasMutex mutex; |
| | | const int ntimes = 1000 * 1000; |
| | | printf("test lock/unlock %d times: ", ntimes); |
| | | RobustMutex mutex; |
| | | auto Lock = [&]() { |
| | | for (int i = 0; i < 10; ++i) { |
| | | for (int i = 0; i < ntimes; ++i) { |
| | | mutex.lock(); |
| | | mutex.unlock(); |
| | | } |
| | |
| | | std::thread t1(Lock), t2(Lock); |
| | | t1.join(); |
| | | t2.join(); |
| | | printf("mutex nlock: %ld, update time error: %ld, normal fail: %ld, error wait: %ld\n", |
| | | mutex.st_.nlock_, |
| | | mutex.st_.nupdate_time_fail, |
| | | mutex.st_.nfail, |
| | | mutex.st_.nexcept); |
| | | } |
| | | |
| | | auto MSFromNow = [](const int ms) { |
| | |
| | | threads.Launch(hb, &run); |
| | | threads.Launch(showStatus, &run); |
| | | int ncli = 10; |
| | | const uint64_t nreq = 1000 * 10; |
| | | const uint64_t nreq = 1000 * 100; |
| | | for (int i = 0; i < ncli; ++i) { |
| | | threads.Launch(asyncRequest, nreq); |
| | | } |
| | | // for (int i = 0; i < 100; ++i) { |
| | | // SyncRequest(0); |
| | | // } |
| | | |
| | | int same = 0; |
| | | int64_t last = 0; |
| | |
| | | threads.WaitAll(); |
| | | auto &st = Status(); |
| | | printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load()); |
| | | BHCleanup(); |
| | | printf("after cleanup\n"); |
| | | } |
New file |
| | |
| | | #include "robust.h" |
| | | #include "util.h" |
| | | |
| | | using namespace robust; |
| | | |
| | | typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb; |
| | | Rcb *GetRCBImpl(SharedMemory &shm, const int nelem) |
| | | { |
| | | int cap = nelem + 1; |
| | | typedef uint64_t Data; |
| | | auto size = sizeof(Rcb) + sizeof(Data) * cap; |
| | | void *p = shm.Alloc(size); |
| | | if (p) { |
| | | return new (p) Rcb(cap, shm.get_segment_manager()); |
| | | } |
| | | return nullptr; |
| | | } |
| | | Rcb *GetRCB(SharedMemory &shm, const int nelem) |
| | | { |
| | | void **pStore = shm.FindOrCreate<void *>("test_rcb_pointer", nullptr); |
| | | if (pStore) { |
| | | if (!*pStore) { |
| | | *pStore = GetRCBImpl(shm, nelem); |
| | | } |
| | | return (Rcb *) *pStore; |
| | | } |
| | | return nullptr; |
| | | } |
| | | |
| | | void MySleep() |
| | | { |
| | | std::this_thread::sleep_for(2us); |
| | | } |
| | | |
| | | BOOST_AUTO_TEST_CASE(RobustTest) |
| | | { |
| | | SharedMemory &shm = TestShm(); |
| | | shm.Remove(); |
| | | pid_t pid = getpid(); |
| | | printf("pid : %d\n", pid); |
| | | auto Access = [](pid_t pid) { |
| | | char buf[100] = {0}; |
| | | sprintf(buf, "/proc/%d/stat", pid); |
| | | int r = access(buf, F_OK); |
| | | printf("access %d\n", r); |
| | | }; |
| | | Access(pid); |
| | | Access(pid + 1); |
| | | // Sleep(10s); |
| | | // return; |
| | | |
| | | int nelement = 640; |
| | | auto rcb = GetRCB(shm, nelement); |
| | | BOOST_CHECK(rcb != nullptr); |
| | | BOOST_CHECK(rcb->empty()); |
| | | BOOST_CHECK(rcb->push_back(1)); |
| | | BOOST_CHECK(rcb->size() == 1); |
| | | int64_t d; |
| | | BOOST_CHECK(rcb->pop_front(d)); |
| | | BOOST_CHECK(rcb->empty()); |
| | | |
| | | const uint64_t nmsg = 1000 * 1000 * 1; |
| | | uint64_t correct_total = nmsg * (nmsg - 1) / 2; |
| | | std::atomic<uint64_t> total(0); |
| | | std::atomic<uint64_t> nwrite(0); |
| | | std::atomic<uint64_t> writedone(0); |
| | | auto Writer = [&]() { |
| | | uint64_t n = 0; |
| | | while ((n = nwrite++) < nmsg) { |
| | | while (!rcb->push_back(n)) { |
| | | // MySleep(); |
| | | } |
| | | ++writedone; |
| | | } |
| | | }; |
| | | std::atomic<uint64_t> nread(0); |
| | | auto Reader = [&]() { |
| | | while (nread.load() < nmsg) { |
| | | int64_t d; |
| | | if (rcb->pop_front(d)) { |
| | | ++nread; |
| | | total += d; |
| | | } else { |
| | | MySleep(); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | auto status = [&]() { |
| | | auto next = steady_clock::now(); |
| | | uint32_t lw = 0; |
| | | uint32_t lr = 0; |
| | | do { |
| | | std::this_thread::sleep_until(next); |
| | | next += 1s; |
| | | auto w = writedone.load(); |
| | | auto r = nread.load(); |
| | | printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld , queue size: %d\n", w, w - lw, r, r - lr, rcb->size()); |
| | | lw = w; |
| | | lr = r; |
| | | } while (nread.load() < nmsg); |
| | | }; |
| | | |
| | | ThreadManager threads; |
| | | boost::timer::auto_cpu_timer timer; |
| | | printf("Testing Robust Buffer, msgs %ld, queue size: %d \n", nmsg, nelement); |
| | | threads.Launch(status); |
| | | for (int i = 0; i < 10; ++i) { |
| | | threads.Launch(Reader); |
| | | threads.Launch(Writer); |
| | | } |
| | | threads.WaitAll(); |
| | | printf("total: %ld, expected: %ld\n", total.load(), correct_total); |
| | | BOOST_CHECK_EQUAL(total.load(), correct_total); |
| | | } |
| | |
| | | BOOST_AUTO_TEST_CASE(TimedWaitTest) |
| | | { |
| | | SharedMemory &shm = TestShm(); |
| | | MsgI::BindShm(shm); |
| | | GlobalInit(shm); |
| | | ShmMsgQueue q(shm, 64); |
| | | for (int i = 0; i < 2; ++i) { |
| | | int ms = i * 100; |
| | |
| | | { |
| | | SharedMemory &shm = TestShm(); |
| | | typedef MsgI Msg; |
| | | Msg::BindShm(shm); |
| | | GlobalInit(shm); |
| | | |
| | | Msg m0(1000); |
| | | BOOST_CHECK(m0.valid()); |
| | |
| | | { |
| | | const int mem_size = 1024 * 1024 * 50; |
| | | SharedMemory &shm = TestShm(); |
| | | MsgI::BindShm(shm); |
| | | GlobalInit(shm); |
| | | |
| | | MQId id = ShmMsgQueue::NewId(); |
| | | const int timeout = 1000; |
| | |
| | | const std::string server_proc_id = "server_proc"; |
| | | |
| | | SharedMemory &shm = TestShm(); |
| | | MsgI::BindShm(shm); |
| | | GlobalInit(shm); |
| | | |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | |
| | | BOOST_AUTO_TEST_CASE(PubSubTest) |
| | | { |
| | | SharedMemory &shm = TestShm(); |
| | | MsgI::BindShm(shm); |
| | | GlobalInit(shm); |
| | | |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | |
| | | BOOST_AUTO_TEST_CASE(ReqRepTest) |
| | | { |
| | | SharedMemory &shm = TestShm(); |
| | | MsgI::BindShm(shm); |
| | | GlobalInit(shm); |
| | | |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | |
| | | |
| | | using namespace boost::posix_time; |
| | | using namespace std::chrono_literals; |
| | | using namespace std::chrono; |
| | | |
| | | template <class D> |
| | | inline void Sleep(D d, bool print = true) |