lichao
2021-04-22 1d6c040dcb9a01648edc66d8c0006c8c9294a705
add mutex timeout limit; use atomic as refcount.
2个文件已删除
5个文件已修改
297 ■■■■■ 已修改文件
src/bh_util.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 27 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.cpp 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.h 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 102 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/lock_free_queue.cpp 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/lock_free_queue.h 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_util.h
@@ -123,13 +123,13 @@
    D &operator*() const { return *p_; }
};
template <class T, class Mutex = std::mutex, class Lock = std::unique_lock<Mutex>>
template <class T, class TMutex = std::mutex, class Lock = std::unique_lock<TMutex>>
class Synced
{
    typedef T Data;
    Mutex mutex_;
    TMutex mutex_;
    Data data_;
    typedef SyncedPtr<Data, Mutex, Lock> Ptr;
    typedef SyncedPtr<Data, TMutex, Lock> Ptr;
public:
    template <class... P>
src/msg.h
@@ -21,6 +21,7 @@
#include "bh_util.h"
#include "proto.h"
#include "shm.h"
#include <atomic>
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <functional>
@@ -38,26 +39,14 @@
// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
class RefCount : private boost::noncopyable
{
public:
    int Inc()
    {
        Guard lk(mutex_);
        return ++num_;
    }
    int Dec()
    {
        Guard lk(mutex_);
        return --num_;
    }
    int Get()
    {
        Guard lk(mutex_);
        return num_;
    }
    std::atomic<int> num_;
private:
    Mutex mutex_;
    int num_ = 1;
public:
    RefCount() :
        num_(1) { static_assert(std::is_pod<decltype(num_)>::value); }
    int Inc() { return ++num_; }
    int Dec() { return --num_; }
    int Get() { return num_.load(); }
};
// message content layout: header_size + header + data_size + data
src/shm.cpp
@@ -21,6 +21,32 @@
namespace bhome_shm
{
bool MutexWithTimeLimit::try_lock()
{
    if (mutex_.try_lock()) {
        auto old_time = last_lock_time_.load();
        if (Now() - old_time > limit_) {
            return last_lock_time_.compare_exchange_strong(old_time, Now());
        } else {
            last_lock_time_.store(Now());
            return true;
        }
    } else {
        auto old_time = last_lock_time_.load();
        if (Now() - old_time > limit_) {
            return last_lock_time_.compare_exchange_strong(old_time, Now());
        } else {
            return false;
        }
    }
}
void MutexWithTimeLimit::lock()
{
    while (!try_lock()) {
        std::this_thread::yield();
    }
}
SharedMemory::SharedMemory(const std::string &name, const uint64_t size) :
    mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()),
    name_(name)
src/shm.h
@@ -19,12 +19,15 @@
#ifndef SHM_6CHO6D6C
#define SHM_6CHO6D6C
#include <atomic>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/noncopyable.hpp>
#include <boost/uuid/uuid.hpp>
#include <chrono>
#include <thread>
namespace bhome_shm
{
@@ -32,7 +35,52 @@
using namespace boost::interprocess;
typedef managed_shared_memory mshm_t;
typedef interprocess_mutex Mutex;
class CasMutex
{
    std::atomic<bool> flag_;
    bool cas(bool expected, bool new_val) { return flag_.compare_exchange_strong(expected, new_val); }
public:
    CasMutex() :
        flag_(false) {}
    bool try_lock() { return cas(false, true); }
    void lock()
    {
        while (!try_lock()) { std::this_thread::yield(); }
    }
    void unlock() { cas(true, false); }
};
class MutexWithTimeLimit
{
    typedef boost::interprocess::interprocess_mutex MutexT;
    // typedef CasMutex MutexT;
    typedef std::chrono::steady_clock Clock;
    typedef Clock::duration Duration;
    static Duration Now() { return Clock::now().time_since_epoch(); }
    const Duration limit_;
    std::atomic<Duration> last_lock_time_;
    MutexT mutex_;
public:
    typedef MutexT::internal_mutex_type internal_mutex_type;
    const internal_mutex_type &internal_mutex() const { return mutex_.internal_mutex(); }
    internal_mutex_type &internal_mutex() { return mutex_.internal_mutex(); }
    explicit MutexWithTimeLimit(Duration limit) :
        limit_(limit) {}
    MutexWithTimeLimit() :
        MutexWithTimeLimit(std::chrono::seconds(1)) {}
    ~MutexWithTimeLimit() { static_assert(std::is_pod<Duration>::value); }
    bool try_lock();
    void lock();
    void unlock() { mutex_.unlock(); }
};
// typedef boost::interprocess::interprocess_mutex Mutex;
typedef MutexWithTimeLimit Mutex;
typedef scoped_lock<Mutex> Guard;
typedef interprocess_condition Cond;
utest/api_test.cpp
@@ -96,6 +96,63 @@
    // printf("client Recv reply : %s\n", reply.data().c_str());
}
class TLMutex
{
    // typedef boost::interprocess::interprocess_mutex MutexT;
    typedef CasMutex MutexT;
    // typedef std::mutex MutexT;
    typedef std::chrono::steady_clock Clock;
    typedef Clock::duration Duration;
    static Duration Now() { return Clock::now().time_since_epoch(); }
    const Duration limit_;
    std::atomic<Duration> last_lock_time_;
    MutexT mutex_;
public:
    struct Status {
        int64_t nlock_ = 0;
        int64_t nupdate_time_fail = 0;
        int64_t nfail = 0;
        int64_t nexcept = 0;
    };
    Status st_;
    explicit TLMutex(Duration limit) :
        limit_(limit) {}
    TLMutex() :
        TLMutex(std::chrono::seconds(1)) {}
    ~TLMutex() { static_assert(std::is_pod<Duration>::value); }
    bool try_lock()
    {
        if (mutex_.try_lock()) {
            auto old_time = last_lock_time_.load();
            if (Now() - old_time > limit_) {
                return last_lock_time_.compare_exchange_strong(old_time, Now());
            } else {
                last_lock_time_.store(Now());
                return true;
            }
        } else {
            auto old_time = last_lock_time_.load();
            if (Now() - old_time > limit_) {
                return last_lock_time_.compare_exchange_strong(old_time, Now());
            } else {
                return false;
            }
        }
    }
    void lock()
    {
        int n = 0;
        while (!try_lock()) {
            n++;
            std::this_thread::yield();
        }
        st_.nlock_ += n;
    }
    void unlock() { mutex_.unlock(); }
};
BOOST_AUTO_TEST_CASE(MutexTest)
{
    const std::string shm_name("ShmMutex");
@@ -104,12 +161,42 @@
    const std::string mtx_name("test_mutex");
    const std::string int_name("test_int");
    auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())();
    auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(3s);
    auto pi = shm.find_or_construct<int>(int_name.c_str())(100);
    typedef std::chrono::steady_clock Clock;
    auto Now = []() { return Clock::now().time_since_epoch(); };
    if (pi) {
        auto old = *pi;
        printf("int : %d, add1: %d\n", old, ++*pi);
    }
    {
        boost::timer::auto_cpu_timer timer;
        printf("test time: ");
        TLMutex mutex;
        // CasMutex mutex;
        auto Lock = [&]() {
            for (int i = 0; i < 1000 * 100; ++i) {
                mutex.lock();
                mutex.unlock();
            }
        };
        std::thread t1(Lock), t2(Lock);
        t1.join();
        t2.join();
        printf("mutex nlock: %ld, update time error: %ld, normal fail: %ld, error wait: %ld\n",
               mutex.st_.nlock_,
               mutex.st_.nupdate_time_fail,
               mutex.st_.nfail,
               mutex.st_.nexcept);
    }
    auto MSFromNow = [](const int ms) {
        using namespace boost::posix_time;
        ptime cur = boost::posix_time::microsec_clock::universal_time();
        return cur + millisec(ms);
    };
    auto TryLock = [&]() {
        if (mtx->try_lock()) {
@@ -128,10 +215,17 @@
    if (mtx) {
        printf("mtx exists\n");
        if (TryLock()) {
            if (TryLock()) {
                Unlock();
            }
            auto op = [&]() {
                if (TryLock()) {
                    Unlock();
                }
            };
            op();
            std::thread t(op);
            t.join();
            // Unlock();
        } else {
            // mtx->unlock();
        }
    } else {
        printf("mtx not exists\n");
utest/lock_free_queue.cpp
File was deleted
utest/lock_free_queue.h
File was deleted