lichao
2021-04-28 a6f67b4249525089fb97eb9418c7014f66c2a000
use new robust mutex, circurar; rm timeout mutex.
3个文件已添加
8个文件已修改
894 ■■■■■ 已修改文件
src/robust.cpp 111 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.h 255 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.cpp 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.h 88 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 113 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 163 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/robust_test.cpp 115 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/simple_tests.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/util.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.cpp
New file
@@ -0,0 +1,111 @@
/*
 * =====================================================================================
 *
 *       Filename:  robust.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月27日 10时04分19秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "robust.h"
#include <chrono>
#include <thread>
namespace robust
{
namespace
{
static_assert(sizeof(steady_clock::duration) == sizeof(int64_t));
static_assert(sizeof(RobustReqRep) == 24);
static_assert(sizeof(CasMutex<false>) == 8);
static_assert(sizeof(CircularBuffer<int>) == 48);
auto Now() { return steady_clock::now().time_since_epoch(); }
const steady_clock::duration kIoTimeout = 10ms;
const steady_clock::duration kIoExpire = 100ms;
void Yield() { std::this_thread::sleep_for(10us); }
} // namespace
void QuickSleep()
{
    Yield();
}
bool RobustReqRep::StateCas(State exp, State val)
{
    bool r = state_.compare_exchange_strong(exp, val);
    return r ? (timestamp_.store(Now()), true) : false;
}
int RobustReqRep::ClientReadReply(Msg &reply)
{
    auto end_time = Now() + kIoTimeout;
    int done = false;
    do {
        if (StateCas(eServerWriteEnd, eClientReadBegin)) {
            Read(reply);
            done = StateCas(eClientReadBegin, eClientReadEnd);
            if (done) { break; }
        }
        Yield();
    } while (Now() < end_time);
    return done ? eSuccess : eTimeout;
}
int RobustReqRep::ClientWriteRequest(const Msg &request)
{
    if (request.size() > capacity_) {
        return eSizeError;
    }
    auto end_time = Now() + kIoTimeout;
    bool done = false;
    do {
        if (StateCas(eStateReady, eClientWriteBegin)) {
            Write(request);
            done = StateCas(eClientWriteBegin, eClientWriteEnd);
            if (done) { break; }
        }
        Yield();
    } while (Now() < end_time);
    return done ? eSuccess : eTimeout;
}
int RobustReqRep::ServerReadRequest(Msg &request)
{
    bool done = false;
    if (StateCas(eClientWriteEnd, eServerReadBegin)) {
        Read(request);
        done = StateCas(eServerReadBegin, eServerReadEnd);
    } else {
        auto old = state_.load();
        if (old != eStateReady && timestamp_.load() + kIoExpire < Now()) {
            StateCas(old, eStateReady);
        }
    }
    return done ? eSuccess : eTimeout;
}
int RobustReqRep::ServerWriteReply(const Msg &reply)
{
    if (reply.size() > capacity_) {
        return eSizeError;
    }
    // no need to loop write, either success or timeout.
    bool done = false;
    if (StateCas(eServerReadEnd, eServerWriteBegin)) {
        Write(reply);
        done = StateCas(eServerWriteBegin, eServerWriteEnd);
    }
    return done ? eSuccess : eTimeout;
}
} // namespace robust
src/robust.h
New file
@@ -0,0 +1,255 @@
/*
 * =====================================================================================
 *
 *       Filename:  robust.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月27日 10时04分29秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef ROBUST_Q31RCWYU
#define ROBUST_Q31RCWYU
#include <atomic>
#include <chrono>
#include <memory>
#include <string.h>
#include <string>
#include <sys/types.h>
#include <unistd.h>
namespace robust
{
using namespace std::chrono;
using namespace std::chrono_literals;
void QuickSleep();
class RobustReqRep
{
    typedef uint32_t State;
    typedef std::string Msg;
    typedef std::chrono::steady_clock::duration Duration;
public:
    enum ErrorCode {
        eSuccess = 0,
        eTimeout = EAGAIN,
        eSizeError = EINVAL,
    };
    explicit RobustReqRep(const uint32_t max_len) :
        capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {}
    void PutReady() { state_.store(eStateReady); }
    bool Ready() const { return state_.load() == eStateReady; }
    uint32_t capacity() const { return capacity_; }
    int ClientRequest(const Msg &request, Msg &reply)
    {
        int r = ClientWriteRequest(request);
        if (r == eSuccess) {
            r = ClientReadReply(reply);
        }
        return r;
    }
    int ClientReadReply(Msg &reply);
    int ClientWriteRequest(const Msg &request);
    int ServerReadRequest(Msg &request);
    int ServerWriteReply(const Msg &reply);
private:
    RobustReqRep(const RobustReqRep &);
    RobustReqRep(RobustReqRep &&);
    RobustReqRep &operator=(const RobustReqRep &) = delete;
    RobustReqRep &operator=(RobustReqRep &&) = delete;
    enum {
        eStateInit = 0,
        eStateReady = 0x19833891,
        eClientWriteBegin,
        eClientWriteEnd,
        eServerReadBegin,
        eServerReadEnd,
        eServerWriteBegin,
        eServerWriteEnd,
        eClientReadBegin,
        eClientReadEnd = eStateReady,
    };
    bool StateCas(State exp, State val);
    void Write(const Msg &msg)
    {
        size_.store(msg.size());
        memcpy(buf, msg.data(), msg.size());
    }
    void Read(Msg &msg) { msg.assign(buf, size_.load()); }
    const uint32_t capacity_;
    std::atomic<State> state_;
    static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data.");
    std::atomic<Duration> timestamp_;
    std::atomic<int32_t> size_;
    char buf[4];
};
template <bool isRobust = false>
class CasMutex
{
    static pid_t pid()
    {
        static pid_t val = getpid();
        return val;
    }
    static bool Killed(pid_t pid)
    {
        char buf[64] = {0};
        snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
        return access(buf, F_OK) != 0;
    }
public:
    CasMutex() :
        meta_(0) {}
    int try_lock()
    {
        const auto t = steady_clock::now().time_since_epoch().count();
        auto old = meta_.load();
        int r = 0;
        if (!Locked(old)) {
            r = MetaCas(old, Meta(1, pid()));
        } else if (isRobust && Killed(Pid(old))) {
            r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1;
            if (r) {
                printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r);
            }
        }
        return r;
    }
    int lock()
    {
        int r = 0;
        do {
            r = try_lock();
        } while (r == 0);
        return r;
    }
    void unlock()
    {
        auto old = meta_.load();
        if (Locked(old) && Pid(old) == pid()) {
            MetaCas(old, Meta(0, pid()));
        }
    }
private:
    std::atomic<uint64_t> meta_;
    bool Locked(uint64_t meta) { return (meta >> 63) != 0; }
    pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); }
    uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; }
    bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
    static_assert(sizeof(pid_t) < sizeof(uint64_t));
};
template <class Lock>
class Guard
{
public:
    Guard(Lock &l) :
        l_(l) { l_.lock(); }
    ~Guard() { l_.unlock(); }
private:
    Guard(const Guard &);
    Guard(Guard &&);
    Lock &l_;
};
template <class D, class Alloc = std::allocator<D>>
class CircularBuffer
{
    typedef uint32_t size_type;
    typedef uint32_t count_type;
    typedef uint64_t meta_type;
    static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
    static count_type Count(meta_type meta) { return meta >> 32; }
    static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
public:
    typedef D Data;
    CircularBuffer(const size_type cap) :
        CircularBuffer(cap, Alloc()) {}
    CircularBuffer(const size_type cap, Alloc const &al) :
        state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap))
    {
        if (!buf) {
            throw("error allocate buffer: out of mem!");
        }
    }
    ~CircularBuffer()
    {
        al_.deallocate(buf, capacity_);
    }
    size_type size() const { return (capacity_ + tail() - head()) % capacity_; }
    bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; }
    bool empty() const { return head() == tail(); }
    bool push_back(Data d)
    {
        Guard<MutexT> guard(mutex_);
        if (!full()) {
            auto old = mtail();
            buf[Pos(old)] = d;
            return mtail_.compare_exchange_strong(old, next(old));
        } else {
            return false;
        }
    }
    bool pop_front(Data &d)
    {
        Guard<MutexT> guard(mutex_);
        if (!empty()) {
            auto old = mhead();
            d = buf[Pos(old)];
            return mhead_.compare_exchange_strong(old, next(old));
        } else {
            return false;
        }
    }
    bool Ready() const { return state_.load() == eStateReady; }
    void PutReady() { state_.store(eStateReady); }
private:
    CircularBuffer(const CircularBuffer &);
    CircularBuffer(CircularBuffer &&);
    CircularBuffer &operator=(const CircularBuffer &) = delete;
    CircularBuffer &operator=(CircularBuffer &&) = delete;
    typedef CasMutex<true> MutexT;
    // static_assert(sizeof(MutexT) == 16);
    meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
    size_type head() const { return Pos(mhead()); }
    size_type tail() const { return Pos(mtail()); }
    meta_type mhead() const { return mhead_.load(); }
    meta_type mtail() const { return mtail_.load(); }
    // data
    enum { eStateReady = 0x19833891 };
    std::atomic<uint32_t> state_;
    const size_type capacity_;
    MutexT mutex_;
    std::atomic<meta_type> mhead_;
    std::atomic<meta_type> mtail_;
    Alloc al_;
    typename Alloc::pointer buf = nullptr;
};
} // namespace robust
#endif // end of include guard: ROBUST_Q31RCWYU
src/shm.cpp
@@ -21,42 +21,6 @@
namespace bhome_shm
{
bool MutexWithTimeLimit::try_lock()
{
    if (mutex_.try_lock()) {
        auto old_time = last_lock_time_.load();
        if (Now() - old_time > limit_) {
            return last_lock_time_.compare_exchange_strong(old_time, Now());
        } else {
            last_lock_time_.store(Now());
            return true;
        }
    } else {
        auto old_time = last_lock_time_.load();
        if (Now() - old_time > limit_) {
            return last_lock_time_.compare_exchange_strong(old_time, Now());
        } else {
            return false;
        }
    }
}
void MutexWithTimeLimit::lock()
{
    while (!try_lock()) {
        std::this_thread::yield();
    }
}
void MutexWithTimeLimit::unlock()
{
    auto old_time = last_lock_time_.load();
    if (Now() - old_time > limit_) {
    } else {
        if (last_lock_time_.compare_exchange_strong(old_time, Now())) {
            mutex_.unlock();
        }
    }
}
SharedMemory::SharedMemory(const std::string &name, const uint64_t size) :
    mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()),
    name_(name)
src/shm.h
@@ -19,13 +19,11 @@
#ifndef SHM_6CHO6D6C
#define SHM_6CHO6D6C
#include "robust.h"
#include <atomic>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/noncopyable.hpp>
#include <chrono>
#include <thread>
namespace bhome_shm
@@ -35,53 +33,65 @@
typedef managed_shared_memory mshm_t;
class CasMutex
{
    std::atomic<bool> flag_;
    bool cas(bool expected, bool new_val) { return flag_.compare_exchange_strong(expected, new_val); }
public:
    CasMutex() :
        flag_(false) {}
    bool try_lock() { return cas(false, true); }
    void lock()
    {
        while (!try_lock()) { std::this_thread::yield(); }
    }
    void unlock() { cas(true, false); }
};
class MutexWithTimeLimit
class MutexWithPidCheck
{
    typedef boost::interprocess::interprocess_mutex MutexT;
    // typedef CasMutex MutexT;
    typedef std::chrono::steady_clock Clock;
    typedef Clock::duration Duration;
    static Duration Now() { return Clock::now().time_since_epoch(); }
    const Duration limit_;
    std::atomic<Duration> last_lock_time_;
    static pid_t pid()
    {
        static pid_t val = getpid();
        return val;
    }
    static bool Killed(pid_t pid)
    {
        char buf[64] = {0};
        snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
        return access(buf, F_OK) != 0;
    }
    bool PidCas(pid_t exp, pid_t val) { return pid_.compare_exchange_strong(exp, val); }
    MutexT mutex_;
    std::atomic<pid_t> pid_;
public:
    typedef MutexT::internal_mutex_type internal_mutex_type;
    const internal_mutex_type &internal_mutex() const { return mutex_.internal_mutex(); }
    internal_mutex_type &internal_mutex() { return mutex_.internal_mutex(); }
    MutexWithPidCheck() :
        pid_(0) {}
    bool try_lock()
    {
        bool r = false;
        if (mutex_.try_lock()) {
            auto old = pid_.load();
            r = PidCas(old, pid());
        } else {
            auto old = pid_.load();
            if (Killed(old)) {
                r = PidCas(old, pid());
                if (r) {
                    printf("PidCheck captured pid %d -> %d\n", old, pid());
                }
            }
        }
        return r;
    }
    explicit MutexWithTimeLimit(Duration limit) :
        limit_(limit) {}
    MutexWithTimeLimit() :
        MutexWithTimeLimit(std::chrono::seconds(1)) {}
    ~MutexWithTimeLimit() { static_assert(std::is_pod<Duration>::value); }
    bool try_lock();
    void lock();
    void unlock();
    void lock()
    {
        while (!try_lock()) {
            std::this_thread::yield();
        }
    }
    void unlock()
    {
        auto old = pid_.load();
        if (old == pid()) {
            mutex_.unlock();
        }
    }
};
// typedef boost::interprocess::interprocess_mutex Mutex;
typedef MutexWithTimeLimit Mutex;
typedef scoped_lock<Mutex> Guard;
typedef interprocess_condition Cond;
typedef robust::CasMutex<true> Mutex;
typedef robust::Guard<Mutex> Guard;
class SharedMemory : public mshm_t
{
src/shm_queue.h
@@ -21,105 +21,70 @@
#include "shm.h"
#include <atomic>
#include <boost/circular_buffer.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <chrono>
namespace bhome_shm
{
template <class D>
using Circular = boost::circular_buffer<D, Allocator<D>>;
using Circular = robust::CircularBuffer<D, Allocator<D>>;
template <class D>
class SharedQueue : private Circular<D>
class SharedQueue
{
    typedef Circular<D> Super;
    Mutex mutex_;
    Cond cond_read_;
    Cond cond_write_;
    Mutex &mutex() { return mutex_; }
public:
    SharedQueue(const uint32_t len, Allocator<D> const &alloc) :
        queue_(len, alloc) {}
    static boost::posix_time::ptime MSFromNow(const int ms)
    template <class OnWrite>
    bool TryWrite(const D &d, const OnWrite &onWrite)
    {
        using namespace boost::posix_time;
        ptime cur = boost::posix_time::microsec_clock::universal_time();
        return cur + millisec(ms);
    }
    auto TimedReadPred(const int timeout_ms)
    {
        auto endtime = MSFromNow(timeout_ms);
        return [this, endtime](Guard &lock) {
            return (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); }));
        };
    }
    auto TryReadPred()
    {
        return [this](Guard &lock) { return !this->empty(); };
    }
    template <class Pred>
    bool ReadOnCond(D &buf, Pred const &pred)
    {
        auto Read = [&]() {
            Guard lock(this->mutex());
            if (pred(lock)) {
                using std::swap;
                swap(buf, Super::front());
                Super::pop_front();
        Guard lock(mutex());
        if (!queue_.full()) {
            onWrite(d);
            queue_.push_back(d);
                return true;
            } else {
                return false;
            }
        };
        return Read() ? (this->cond_write_.notify_one(), true) : false;
    }
    template <class Iter, class Pred, class OnWrite>
    int WriteAllOnCond(Iter begin, Iter end, Pred const &pred, OnWrite const &onWrite)
    bool TryWrite(const D &d)
    {
        if (begin == end) { return 0; }
        int n = 0;
        Guard lock(mutex());
        while (pred(lock)) {
            onWrite(*begin);
            Super::push_back(*begin);
            ++n;
            cond_read_.notify_one();
            if (++begin == end) {
                break;
            }
        }
        return n;
        return !queue_.full() ? (queue_.push_back(d), true) : false;
    }
public:
    SharedQueue(const uint32_t len, Allocator<D> const &alloc) :
        Super(len, alloc) {}
    template <class Iter, class OnWrite>
    int TryWrite(Iter begin, Iter end, const OnWrite &onWrite)
    bool Read(D &d, const int timeout_ms)
    {
        auto tryWritePred = [this](Guard &lock) { return !this->full(); };
        return WriteAllOnCond(begin, end, tryWritePred, onWrite);
        using namespace std::chrono;
        auto end_time = steady_clock::now() + milliseconds(timeout_ms);
        do {
            if (TryRead(d)) {
                return true;
            } else {
                robust::QuickSleep();
    }
    template <class OnWrite>
    bool TryWrite(const D &buf, const OnWrite &onWrite) { return TryWrite(&buf, (&buf) + 1, onWrite); }
    bool TryWrite(const D &buf)
        } while (steady_clock::now() < end_time);
        return false;
    }
    bool TryRead(D &d)
    {
        return TryWrite(buf, [](const D &buf) {});
        Guard lock(mutex());
        if (!queue_.empty()) {
            queue_.pop_front(d);
            return true;
        } else {
            return false;
        }
    }
    template <class OnData>
    int ReadAll(const int timeout_ms, OnData const &onData) { return ReadAllOnCond(TimedReadPred(timeout_ms), onData); }
    template <class OnData>
    int TryReadAll(OnData const &onData) { return ReadAllOnCond(TryReadPred(), onData); }
    bool Read(D &buf, const int timeout_ms) { return ReadOnCond(buf, TimedReadPred(timeout_ms)); }
    bool TryRead(D &buf) { return ReadOnCond(buf, TryReadPred()); }
private:
    typedef Circular<D> Queue;
    Queue queue_;
    Mutex mutex_;
    Mutex &mutex() { return mutex_; }
};
} // namespace bhome_shm
utest/api_test.cpp
@@ -16,6 +16,7 @@
 * =====================================================================================
 */
#include "bh_api.h"
#include "robust.h"
#include "util.h"
#include <atomic>
#include <boost/lockfree/queue.hpp>
@@ -96,138 +97,22 @@
    // printf("client Recv reply : %s\n", reply.data().c_str());
}
class TLMutex
{
    typedef boost::interprocess::interprocess_mutex MutexT;
    // typedef CasMutex MutexT;
    // typedef std::mutex MutexT;
    typedef std::chrono::steady_clock Clock;
    typedef Clock::duration Duration;
    static Duration Now() { return Clock::now().time_since_epoch(); }
    const Duration limit_;
    std::atomic<Duration> last_lock_time_;
    MutexT mutex_;
    bool Expired(const Duration diff) { return diff > limit_; }
public:
    struct Status {
        int64_t nlock_ = 0;
        int64_t nupdate_time_fail = 0;
        int64_t nfail = 0;
        int64_t nexcept = 0;
    };
    Status st_;
    explicit TLMutex(Duration limit) :
        limit_(limit) {}
    TLMutex() :
        TLMutex(std::chrono::seconds(1)) {}
    ~TLMutex() { static_assert(std::is_pod<Duration>::value); }
    bool try_lock()
    {
        if (mutex_.try_lock()) {
            auto old_time = last_lock_time_.load();
            auto cur = Now();
            if (Expired(cur - old_time)) {
                return last_lock_time_.compare_exchange_strong(old_time, cur);
            } else {
                last_lock_time_.store(Now());
                return true;
            }
        } else {
            auto old_time = last_lock_time_.load();
            auto cur = Now();
            if (Expired(cur - old_time)) {
                return last_lock_time_.compare_exchange_strong(old_time, cur);
            } else {
                return false;
            }
        }
    }
    void lock()
    {
        int n = 0;
        while (!try_lock()) {
            n++;
            std::this_thread::yield();
        }
        st_.nlock_ += n;
    }
    void unlock()
    {
        auto old_time = last_lock_time_.load();
        auto cur = Now();
        if (!Expired(cur - old_time)) {
            if (last_lock_time_.compare_exchange_strong(old_time, cur)) {
                mutex_.unlock();
            }
        }
    }
};
//robust attr does NOT work, maybe os does not support it.
class RobustMutex
{
public:
    RobustMutex()
    {
        pthread_mutexattr_t mutex_attr;
        auto attr = [&]() { return &mutex_attr; };
        int r = pthread_mutexattr_init(attr());
        r |= pthread_mutexattr_setpshared(attr(), PTHREAD_PROCESS_SHARED);
        r |= pthread_mutexattr_setrobust_np(attr(), PTHREAD_MUTEX_ROBUST_NP);
        r |= pthread_mutex_init(mtx(), attr());
        int rob = 0;
        pthread_mutexattr_getrobust_np(attr(), &rob);
        int shared = 0;
        pthread_mutexattr_getpshared(attr(), &shared);
        printf("robust : %d, shared : %d\n", rob, shared);
        r |= pthread_mutexattr_destroy(attr());
        if (r) {
            throw("init mutex error.");
        }
    }
    ~RobustMutex()
    {
        pthread_mutex_destroy(mtx());
    }
public:
    void lock() { Lock(); }
    bool try_lock()
    {
        int r = TryLock();
        printf("TryLock ret: %d\n", r);
        return r == 0;
    }
    void unlock() { Unlock(); }
    // private:
    int TryLock() { return pthread_mutex_trylock(mtx()); }
    int Lock() { return pthread_mutex_lock(mtx()); }
    int Unlock() { return pthread_mutex_unlock(mtx()); }
private:
    pthread_mutex_t *mtx() { return &mutex_; }
    pthread_mutex_t mutex_;
};
class LockFreeQueue
{
    typedef int64_t Data;
    typedef boost::lockfree::queue<Data, boost::lockfree::capacity<1024>> LFQueue;
    void push_back(Data d) { queue_.push(d); }
private:
    LFQueue queue_;
};
} // namespace
#include <chrono>
using namespace std::chrono;
// using namespace std::chrono_literals;
BOOST_AUTO_TEST_CASE(MutexTest)
{
    // typedef robust::CasMutex<true> RobustMutex;
    typedef MutexWithPidCheck RobustMutex;
    for (int i = 0; i < 20; ++i) {
        int size = i;
        int left = size & 7;
        int rsize = size + ((8 - left) & 7);
        printf("size: %3d, rsize: %3d\n", size, rsize);
    }
    SharedMemory &shm = TestShm();
    // shm.Remove();
    // return;
@@ -235,7 +120,7 @@
    const std::string mtx_name("test_mutex");
    const std::string int_name("test_int");
    auto mtx = shm.FindOrCreate<TLMutex>(mtx_name);
    auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
    auto pi = shm.FindOrCreate<int>(int_name, 100);
    std::mutex m;
@@ -248,11 +133,11 @@
    {
        boost::timer::auto_cpu_timer timer;
        printf("test time: ");
        TLMutex mutex;
        // CasMutex mutex;
        const int ntimes = 1000 * 1000;
        printf("test lock/unlock %d times: ", ntimes);
        RobustMutex mutex;
        auto Lock = [&]() {
            for (int i = 0; i < 10; ++i) {
            for (int i = 0; i < ntimes; ++i) {
                mutex.lock();
                mutex.unlock();
            }
@@ -260,11 +145,6 @@
        std::thread t1(Lock), t2(Lock);
        t1.join();
        t2.join();
        printf("mutex nlock: %ld, update time error: %ld, normal fail: %ld, error wait: %ld\n",
               mutex.st_.nlock_,
               mutex.st_.nupdate_time_fail,
               mutex.st_.nfail,
               mutex.st_.nexcept);
    }
    auto MSFromNow = [](const int ms) {
@@ -487,10 +367,13 @@
    threads.Launch(hb, &run);
    threads.Launch(showStatus, &run);
    int ncli = 10;
    const uint64_t nreq = 1000 * 10;
    const uint64_t nreq = 1000 * 100;
    for (int i = 0; i < ncli; ++i) {
        threads.Launch(asyncRequest, nreq);
    }
    // for (int i = 0; i < 100; ++i) {
    //     SyncRequest(0);
    // }
    int same = 0;
    int64_t last = 0;
@@ -509,4 +392,6 @@
    threads.WaitAll();
    auto &st = Status();
    printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load());
    BHCleanup();
    printf("after cleanup\n");
}
utest/robust_test.cpp
New file
@@ -0,0 +1,115 @@
#include "robust.h"
#include "util.h"
using namespace robust;
typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb;
Rcb *GetRCBImpl(SharedMemory &shm, const int nelem)
{
    int cap = nelem + 1;
    typedef uint64_t Data;
    auto size = sizeof(Rcb) + sizeof(Data) * cap;
    void *p = shm.Alloc(size);
    if (p) {
        return new (p) Rcb(cap, shm.get_segment_manager());
    }
    return nullptr;
}
Rcb *GetRCB(SharedMemory &shm, const int nelem)
{
    void **pStore = shm.FindOrCreate<void *>("test_rcb_pointer", nullptr);
    if (pStore) {
        if (!*pStore) {
            *pStore = GetRCBImpl(shm, nelem);
        }
        return (Rcb *) *pStore;
    }
    return nullptr;
}
void MySleep()
{
    std::this_thread::sleep_for(2us);
}
BOOST_AUTO_TEST_CASE(RobustTest)
{
    SharedMemory &shm = TestShm();
    shm.Remove();
    pid_t pid = getpid();
    printf("pid : %d\n", pid);
    auto Access = [](pid_t pid) {
        char buf[100] = {0};
        sprintf(buf, "/proc/%d/stat", pid);
        int r = access(buf, F_OK);
        printf("access %d\n", r);
    };
    Access(pid);
    Access(pid + 1);
    // Sleep(10s);
    // return;
    int nelement = 640;
    auto rcb = GetRCB(shm, nelement);
    BOOST_CHECK(rcb != nullptr);
    BOOST_CHECK(rcb->empty());
    BOOST_CHECK(rcb->push_back(1));
    BOOST_CHECK(rcb->size() == 1);
    int64_t d;
    BOOST_CHECK(rcb->pop_front(d));
    BOOST_CHECK(rcb->empty());
    const uint64_t nmsg = 1000 * 1000 * 1;
    uint64_t correct_total = nmsg * (nmsg - 1) / 2;
    std::atomic<uint64_t> total(0);
    std::atomic<uint64_t> nwrite(0);
    std::atomic<uint64_t> writedone(0);
    auto Writer = [&]() {
        uint64_t n = 0;
        while ((n = nwrite++) < nmsg) {
            while (!rcb->push_back(n)) {
                // MySleep();
            }
            ++writedone;
        }
    };
    std::atomic<uint64_t> nread(0);
    auto Reader = [&]() {
        while (nread.load() < nmsg) {
            int64_t d;
            if (rcb->pop_front(d)) {
                ++nread;
                total += d;
            } else {
                MySleep();
            }
        }
    };
    auto status = [&]() {
        auto next = steady_clock::now();
        uint32_t lw = 0;
        uint32_t lr = 0;
        do {
            std::this_thread::sleep_until(next);
            next += 1s;
            auto w = writedone.load();
            auto r = nread.load();
            printf("write: %6ld, spd: %6ld,  read: %6ld, spd: %6ld , queue size: %d\n", w, w - lw, r, r - lr, rcb->size());
            lw = w;
            lr = r;
        } while (nread.load() < nmsg);
    };
    ThreadManager threads;
    boost::timer::auto_cpu_timer timer;
    printf("Testing Robust Buffer, msgs %ld, queue size: %d \n", nmsg, nelement);
    threads.Launch(status);
    for (int i = 0; i < 10; ++i) {
        threads.Launch(Reader);
        threads.Launch(Writer);
    }
    threads.WaitAll();
    printf("total: %ld, expected: %ld\n", total.load(), correct_total);
    BOOST_CHECK_EQUAL(total.load(), correct_total);
}
utest/simple_tests.cpp
@@ -107,7 +107,7 @@
BOOST_AUTO_TEST_CASE(TimedWaitTest)
{
    SharedMemory &shm = TestShm();
    MsgI::BindShm(shm);
    GlobalInit(shm);
    ShmMsgQueue q(shm, 64);
    for (int i = 0; i < 2; ++i) {
        int ms = i * 100;
@@ -123,7 +123,7 @@
{
    SharedMemory &shm = TestShm();
    typedef MsgI Msg;
    Msg::BindShm(shm);
    GlobalInit(shm);
    Msg m0(1000);
    BOOST_CHECK(m0.valid());
utest/speed_test.cpp
@@ -24,7 +24,7 @@
{
    const int mem_size = 1024 * 1024 * 50;
    SharedMemory &shm = TestShm();
    MsgI::BindShm(shm);
    GlobalInit(shm);
    MQId id = ShmMsgQueue::NewId();
    const int timeout = 1000;
@@ -122,7 +122,7 @@
    const std::string server_proc_id = "server_proc";
    SharedMemory &shm = TestShm();
    MsgI::BindShm(shm);
    GlobalInit(shm);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
utest/utest.cpp
@@ -90,7 +90,7 @@
BOOST_AUTO_TEST_CASE(PubSubTest)
{
    SharedMemory &shm = TestShm();
    MsgI::BindShm(shm);
    GlobalInit(shm);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
@@ -201,7 +201,7 @@
BOOST_AUTO_TEST_CASE(ReqRepTest)
{
    SharedMemory &shm = TestShm();
    MsgI::BindShm(shm);
    GlobalInit(shm);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
utest/util.h
@@ -36,6 +36,7 @@
using namespace boost::posix_time;
using namespace std::chrono_literals;
using namespace std::chrono;
template <class D>
inline void Sleep(D d, bool print = true)