lichao
2021-04-29 72bffb0807925a156b076b71f78c848a08d27b87
refactor mutex.
5个文件已修改
266 ■■■■ 已修改文件
src/robust.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.h 73 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 85 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/robust_test.cpp 104 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.cpp
@@ -26,7 +26,7 @@
{
static_assert(sizeof(steady_clock::duration) == sizeof(int64_t));
static_assert(sizeof(RobustReqRep) == 24);
static_assert(sizeof(CasMutex<false>) == 8);
static_assert(sizeof(Mutex) == 8);
static_assert(sizeof(CircularBuffer<int>) == 48);
auto Now() { return steady_clock::now().time_since_epoch(); }
src/robust.h
@@ -32,6 +32,7 @@
using namespace std::chrono;
using namespace std::chrono_literals;
constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
void QuickSleep();
@@ -102,20 +103,41 @@
    char buf[4];
};
template <bool isRobust = false>
class CasMutex
class PidLocker
{
    static pid_t pid()
public:
    typedef int locker_t;
    static locker_t this_locker()
    {
        static pid_t val = getpid();
        static locker_t val = getpid();
        return val;
    }
    static bool Killed(pid_t pid)
    static bool is_alive(locker_t locker) { return true; }
};
class RobustPidLocker
{
public:
    typedef int locker_t;
    static locker_t this_locker()
    {
        static locker_t val = getpid();
        return val;
    }
    static bool is_alive(locker_t locker)
    {
        char buf[64] = {0};
        snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
        return access(buf, F_OK) != 0;
        snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker);
        return access(buf, F_OK) == 0;
    }
};
template <class LockerT>
class CasMutex
{
    typedef typename LockerT::locker_t locker_t;
    static inline locker_t this_locker() { return LockerT::this_locker(); }
    static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
public:
    CasMutex() :
@@ -126,11 +148,11 @@
        auto old = meta_.load();
        int r = 0;
        if (!Locked(old)) {
            r = MetaCas(old, Meta(1, pid()));
        } else if (isRobust && Killed(Pid(old))) {
            r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1;
            r = MetaCas(old, Meta(1, this_locker()));
        } else if (!is_alive(Locker(old))) {
            r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
            if (r) {
                printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r);
                printf("captured pid %d -> %d, r = %d\n", Locker(old), this_locker(), r);
            }
        }
        return r;
@@ -146,19 +168,21 @@
    void unlock()
    {
        auto old = meta_.load();
        if (Locked(old) && Pid(old) == pid()) {
            MetaCas(old, Meta(0, pid()));
        if (Locked(old) && Locker(old) == this_locker()) {
            MetaCas(old, Meta(0, this_locker()));
        }
    }
private:
    static_assert(sizeof(locker_t) < sizeof(uint64_t), "locker size must be smaller than 64 bit!");
    std::atomic<uint64_t> meta_;
    bool Locked(uint64_t meta) { return (meta >> 63) != 0; }
    pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); }
    uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; }
    locker_t Locker(uint64_t meta) { return meta & MaskBits(sizeof(locker_t) * 8); }
    uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; }
    bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
    static_assert(sizeof(pid_t) < sizeof(uint64_t));
};
typedef CasMutex<RobustPidLocker> Mutex;
template <class Lock>
class Guard
@@ -193,19 +217,17 @@
        state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap))
    {
        if (!buf) {
            throw("error allocate buffer: out of mem!");
            throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
        }
    }
    ~CircularBuffer()
    {
        al_.deallocate(buf, capacity_);
    }
    ~CircularBuffer() { al_.deallocate(buf, capacity_); }
    size_type size() const { return (capacity_ + tail() - head()) % capacity_; }
    bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; }
    bool empty() const { return head() == tail(); }
    bool push_back(Data d)
    {
        Guard<MutexT> guard(mutex_);
        Guard<Mutex> guard(mutex_);
        if (!full()) {
            auto old = mtail();
            buf[Pos(old)] = d;
@@ -216,7 +238,7 @@
    }
    bool pop_front(Data &d)
    {
        Guard<MutexT> guard(mutex_);
        Guard<Mutex> guard(mutex_);
        if (!empty()) {
            auto old = mhead();
            d = buf[Pos(old)];
@@ -233,8 +255,7 @@
    CircularBuffer(CircularBuffer &&);
    CircularBuffer &operator=(const CircularBuffer &) = delete;
    CircularBuffer &operator=(CircularBuffer &&) = delete;
    typedef CasMutex<true> MutexT;
    // static_assert(sizeof(MutexT) == 16);
    meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
    size_type head() const { return Pos(mhead()); }
    size_type tail() const { return Pos(mtail()); }
@@ -244,7 +265,7 @@
    enum { eStateReady = 0x19833891 };
    std::atomic<uint32_t> state_;
    const size_type capacity_;
    MutexT mutex_;
    Mutex mutex_;
    std::atomic<meta_type> mhead_;
    std::atomic<meta_type> mtail_;
    Alloc al_;
src/shm.h
@@ -90,7 +90,7 @@
    }
};
typedef robust::CasMutex<true> Mutex;
typedef robust::Mutex Mutex;
typedef robust::Guard<Mutex> Guard;
class SharedMemory : public mshm_t
utest/api_test.cpp
@@ -102,91 +102,6 @@
using namespace std::chrono;
// using namespace std::chrono_literals;
BOOST_AUTO_TEST_CASE(MutexTest)
{
    // typedef robust::CasMutex<true> RobustMutex;
    typedef MutexWithPidCheck RobustMutex;
    for (int i = 0; i < 20; ++i) {
        int size = i;
        int left = size & 7;
        int rsize = size + ((8 - left) & 7);
        printf("size: %3d, rsize: %3d\n", size, rsize);
    }
    SharedMemory &shm = TestShm();
    // shm.Remove();
    // return;
    GlobalInit(shm);
    const std::string mtx_name("test_mutex");
    const std::string int_name("test_int");
    auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
    auto pi = shm.FindOrCreate<int>(int_name, 100);
    std::mutex m;
    typedef std::chrono::steady_clock Clock;
    auto Now = []() { return Clock::now().time_since_epoch(); };
    if (pi) {
        auto old = *pi;
        printf("int : %d, add1: %d\n", old, ++*pi);
    }
    {
        boost::timer::auto_cpu_timer timer;
        const int ntimes = 1000 * 1000;
        printf("test lock/unlock %d times: ", ntimes);
        RobustMutex mutex;
        auto Lock = [&]() {
            for (int i = 0; i < ntimes; ++i) {
                mutex.lock();
                mutex.unlock();
            }
        };
        std::thread t1(Lock), t2(Lock);
        t1.join();
        t2.join();
    }
    auto MSFromNow = [](const int ms) {
        using namespace boost::posix_time;
        ptime cur = boost::posix_time::microsec_clock::universal_time();
        return cur + millisec(ms);
    };
    auto TryLock = [&]() {
        if (mtx->try_lock()) {
            printf("try_lock ok\n");
            return true;
        } else {
            printf("try_lock failed\n");
            return false;
        }
    };
    auto Unlock = [&]() {
        mtx->unlock();
        printf("unlocked\n");
    };
    if (mtx) {
        printf("mtx exists\n");
        if (TryLock()) {
            auto op = [&]() {
                if (TryLock()) {
                    Unlock();
                }
            };
            op();
            std::thread t(op);
            t.join();
            // Unlock();
        } else {
            // mtx->unlock();
        }
    } else {
        printf("mtx not exists\n");
    }
}
BOOST_AUTO_TEST_CASE(ApiTest)
{
    auto max_time = std::chrono::steady_clock::time_point::max();
utest/robust_test.cpp
@@ -3,8 +3,13 @@
using namespace robust;
enum {
    eLockerBits = 32,
    eLockerMask = MaskBits(sizeof(int) * 8),
};
typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb;
Rcb *GetRCBImpl(SharedMemory &shm, const int nelem)
Rcb *GetRCB(SharedMemory &shm, const int nelem)
{
    int cap = nelem + 1;
    typedef uint64_t Data;
@@ -15,24 +20,13 @@
    }
    return nullptr;
}
Rcb *GetRCB(SharedMemory &shm, const int nelem)
{
    void **pStore = shm.FindOrCreate<void *>("test_rcb_pointer", nullptr);
    if (pStore) {
        if (!*pStore) {
            *pStore = GetRCBImpl(shm, nelem);
        }
        return (Rcb *) *pStore;
    }
    return nullptr;
}
void MySleep()
{
    std::this_thread::sleep_for(2us);
}
BOOST_AUTO_TEST_CASE(RobustTest)
BOOST_AUTO_TEST_CASE(QueueTest)
{
    SharedMemory &shm = TestShm();
    shm.Remove();
@@ -112,4 +106,88 @@
    threads.WaitAll();
    printf("total: %ld, expected: %ld\n", total.load(), correct_total);
    BOOST_CHECK_EQUAL(total.load(), correct_total);
}
BOOST_AUTO_TEST_CASE(MutexTest)
{
    typedef robust::Mutex RobustMutex;
    for (int i = 0; i < 20; ++i) {
        int size = i;
        int left = size & 7;
        int rsize = size + ((8 - left) & 7);
        printf("size: %3d, rsize: %3d\n", size, rsize);
    }
    SharedMemory &shm = TestShm();
    // shm.Remove();
    // return;
    GlobalInit(shm);
    const std::string mtx_name("test_mutex");
    const std::string int_name("test_int");
    auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
    auto pi = shm.FindOrCreate<int>(int_name, 100);
    std::mutex m;
    typedef std::chrono::steady_clock Clock;
    auto Now = []() { return Clock::now().time_since_epoch(); };
    if (pi) {
        auto old = *pi;
        printf("int : %d, add1: %d\n", old, ++*pi);
    }
    {
        boost::timer::auto_cpu_timer timer;
        const int ntimes = 1000 * 1000;
        printf("test lock/unlock %d times: ", ntimes);
        RobustMutex mutex;
        auto Lock = [&]() {
            for (int i = 0; i < ntimes; ++i) {
                mutex.lock();
                mutex.unlock();
            }
        };
        std::thread t1(Lock), t2(Lock);
        t1.join();
        t2.join();
    }
    auto MSFromNow = [](const int ms) {
        using namespace boost::posix_time;
        ptime cur = boost::posix_time::microsec_clock::universal_time();
        return cur + millisec(ms);
    };
    auto TryLock = [&]() {
        if (mtx->try_lock()) {
            printf("try_lock ok\n");
            return true;
        } else {
            printf("try_lock failed\n");
            return false;
        }
    };
    auto Unlock = [&]() {
        mtx->unlock();
        printf("unlocked\n");
    };
    if (mtx) {
        printf("mtx exists\n");
        if (TryLock()) {
            auto op = [&]() {
                if (TryLock()) {
                    Unlock();
                }
            };
            op();
            std::thread t(op);
            t.join();
            // Unlock();
        } else {
            // mtx->unlock();
        }
    } else {
        printf("mtx not exists\n");
    }
}