lichao
2021-05-08 28f06bc49a4d8d69f1ea2f767863b7921d12f155
add robust FMutex, works fine; use boost circular.
7个文件已修改
520 ■■■■ 已修改文件
src/robust.cpp 90 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.h 221 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.h 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.cpp 29 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.h 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/robust_test.cpp 128 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.cpp
@@ -25,87 +25,35 @@
namespace
{
static_assert(sizeof(steady_clock::duration) == sizeof(int64_t));
static_assert(sizeof(RobustReqRep) == 24);
static_assert(sizeof(Mutex) == 8);
static_assert(sizeof(CircularBuffer<int>) == 48);
auto Now() { return steady_clock::now().time_since_epoch(); }
const steady_clock::duration kIoTimeout = 10ms;
const steady_clock::duration kIoExpire = 100ms;
void Yield() { std::this_thread::sleep_for(10us); }
} // namespace
void QuickSleep()
{
    Yield();
}
bool RobustReqRep::StateCas(State exp, State val)
{
    bool r = state_.compare_exchange_strong(exp, val);
    return r ? (timestamp_.store(Now()), true) : false;
}
void QuickSleep() { Yield(); }
int RobustReqRep::ClientReadReply(Msg &reply)
bool FMutex::try_lock()
{
    auto end_time = Now() + kIoTimeout;
    int done = false;
    do {
        if (StateCas(eServerWriteEnd, eClientReadBegin)) {
            Read(reply);
            done = StateCas(eClientReadBegin, eClientReadEnd);
            if (done) { break; }
        }
        Yield();
    } while (Now() < end_time);
    return done ? eSuccess : eTimeout;
}
int RobustReqRep::ClientWriteRequest(const Msg &request)
{
    if (request.size() > capacity_) {
        return eSizeError;
    }
    auto end_time = Now() + kIoTimeout;
    bool done = false;
    do {
        if (StateCas(eStateReady, eClientWriteBegin)) {
            Write(request);
            done = StateCas(eClientWriteBegin, eClientWriteEnd);
            if (done) { break; }
        }
        Yield();
    } while (Now() < end_time);
    return done ? eSuccess : eTimeout;
}
int RobustReqRep::ServerReadRequest(Msg &request)
{
    bool done = false;
    if (StateCas(eClientWriteEnd, eServerReadBegin)) {
        Read(request);
        done = StateCas(eServerReadBegin, eServerReadEnd);
    } else {
        auto old = state_.load();
        if (old != eStateReady && timestamp_.load() + kIoExpire < Now()) {
            StateCas(old, eStateReady);
    if (flock(fd_, LOCK_EX | LOCK_NB) == 0) {
        if (mtx_.try_lock()) {
            return true;
        } else {
            flock(fd_, LOCK_UN);
        }
    }
    return done ? eSuccess : eTimeout;
    return false;
}
int RobustReqRep::ServerWriteReply(const Msg &reply)
void FMutex::lock()
{
    if (reply.size() > capacity_) {
        return eSizeError;
    }
    // no need to loop write, either success or timeout.
    bool done = false;
    if (StateCas(eServerReadEnd, eServerWriteBegin)) {
        Write(reply);
        done = StateCas(eServerWriteBegin, eServerWriteEnd);
    }
    return done ? eSuccess : eTimeout;
    //Note: the lock order affects performance a lot,
    // locking fd_ first is about 100 times faster than locking mtx_ first.
    flock(fd_, LOCK_EX);
    mtx_.lock();
}
void FMutex::unlock()
{
    mtx_.unlock();
    flock(fd_, LOCK_UN);
}
} // namespace robust
src/robust.h
@@ -23,8 +23,12 @@
#include <atomic>
#include <chrono>
#include <memory>
#include <string.h>
#include <mutex>
#include <string>
#include <sys/file.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
@@ -37,143 +41,21 @@
void QuickSleep();
class RobustReqRep
{
    typedef uint32_t State;
    typedef std::string Msg;
    typedef std::chrono::steady_clock::duration Duration;
public:
    enum ErrorCode {
        eSuccess = 0,
        eTimeout = EAGAIN,
        eSizeError = EINVAL,
    };
    explicit RobustReqRep(const uint32_t max_len) :
        capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {}
    void PutReady() { state_.store(eStateReady); }
    bool Ready() const { return state_.load() == eStateReady; }
    uint32_t capacity() const { return capacity_; }
    int ClientRequest(const Msg &request, Msg &reply)
    {
        int r = ClientWriteRequest(request);
        if (r == eSuccess) {
            r = ClientReadReply(reply);
        }
        return r;
    }
    int ClientReadReply(Msg &reply);
    int ClientWriteRequest(const Msg &request);
    int ServerReadRequest(Msg &request);
    int ServerWriteReply(const Msg &reply);
private:
    RobustReqRep(const RobustReqRep &);
    RobustReqRep(RobustReqRep &&);
    RobustReqRep &operator=(const RobustReqRep &) = delete;
    RobustReqRep &operator=(RobustReqRep &&) = delete;
    enum {
        eStateInit = 0,
        eStateReady = 0x19833891,
        eClientWriteBegin,
        eClientWriteEnd,
        eServerReadBegin,
        eServerReadEnd,
        eServerWriteBegin,
        eServerWriteEnd,
        eClientReadBegin,
        eClientReadEnd = eStateReady,
    };
    bool StateCas(State exp, State val);
    void Write(const Msg &msg)
    {
        size_.store(msg.size());
        memcpy(buf, msg.data(), msg.size());
    }
    void Read(Msg &msg) { msg.assign(buf, size_.load()); }
    const uint32_t capacity_;
    std::atomic<State> state_;
    static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data.");
    std::atomic<Duration> timestamp_;
    std::atomic<int32_t> size_;
    char buf[4];
};
class PidLocker
{
public:
    typedef int locker_t;
    enum { eLockerBits = sizeof(locker_t) * 8 };
    static locker_t this_locker()
    {
        static locker_t val = getpid();
        return val;
    }
    static bool is_alive(locker_t locker) { return true; }
};
class RobustPidLocker
{
public:
    typedef int locker_t;
    enum { eLockerBits = sizeof(locker_t) * 8 };
    static locker_t this_locker()
    {
        static locker_t val = getpid();
        return val;
    }
    static bool is_alive(locker_t locker)
    {
        char buf[64] = {0};
        snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker);
        return access(buf, F_OK) == 0;
    }
};
class ExpiredLocker
{
public:
    typedef int64_t locker_t;
    enum { eLockerBits = 63 };
    static locker_t this_locker() { return Now(); }
    static bool is_alive(locker_t locker)
    {
        return Now() < locker + steady_clock::duration(10s).count();
    }
private:
    static locker_t Now() { return steady_clock::now().time_since_epoch().count(); }
};
template <class LockerT>
class CasMutex
{
    typedef typename LockerT::locker_t locker_t;
    static inline locker_t this_locker() { return LockerT::this_locker(); }
    static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
    static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits);
    static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!");
    typedef uint64_t locker_t;
    static inline locker_t this_locker() { return pthread_self(); }
    static const uint64_t kLockerMask = MaskBits(63);
public:
    CasMutex() :
        meta_(0) {}
    int try_lock()
    {
        const auto t = steady_clock::now().time_since_epoch().count();
        auto old = meta_.load();
        int r = 0;
        if (!Locked(old)) {
            r = MetaCas(old, Meta(1, this_locker()));
        } else if (!is_alive(Locker(old))) {
            r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
            if (r) {
                LOG_DEBUG() << "captured locker " << int64_t(Locker(old)) << " -> " << int64_t(this_locker()) << ", locker = " << r;
            }
        }
        return r;
    }
@@ -201,7 +83,89 @@
    bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
};
typedef CasMutex<RobustPidLocker> Mutex;
class NullMutex
{
public:
    bool try_lock() { return true; }
    void lock() {}
    void unlock() {}
};
// flock + mutex
class FMutex
{
public:
    typedef uint64_t id_t;
    FMutex(id_t id) :
        id_(id), fd_(Open(id_))
    {
        if (fd_ == -1) { throw "error create mutex!"; }
    }
    ~FMutex() { Close(fd_); }
    bool try_lock();
    void lock();
    void unlock();
private:
    static std::string GetPath(id_t id)
    {
        const std::string dir("/tmp/.bhome_mtx");
        mkdir(dir.c_str(), 0777);
        return dir + "/fm_" + std::to_string(id);
    }
    static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDWR, 0666); }
    static int Close(int fd) { return close(fd); }
    id_t id_;
    int fd_;
    std::mutex mtx_;
};
union semun {
    int val;               /* Value for SETVAL */
    struct semid_ds *buf;  /* Buffer for IPC_STAT, IPC_SET */
    unsigned short *array; /* Array for GETALL, SETALL */
    struct seminfo *__buf; /* Buffer for IPC_INFO
                                           (Linux-specific) */
};
class SemMutex
{
public:
    SemMutex(key_t key) :
        key_(key), sem_id_(semget(key, 1, 0666 | IPC_CREAT))
    {
        if (sem_id_ == -1) { throw "error create semaphore."; }
        union semun init_val;
        init_val.val = 1;
        semctl(sem_id_, 0, SETVAL, init_val);
    }
    ~SemMutex()
    {
        // semctl(sem_id_, 0, IPC_RMID, semun{});
    }
    bool try_lock()
    {
        sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT};
        return semop(sem_id_, &op, 1) == 0;
    }
    void lock()
    {
        sembuf op = {0, -1, SEM_UNDO};
        semop(sem_id_, &op, 1);
    }
    void unlock()
    {
        sembuf op = {0, 1, SEM_UNDO};
        semop(sem_id_, &op, 1);
    }
private:
    key_t key_;
    int sem_id_;
};
template <class Lock>
class Guard
@@ -245,7 +209,6 @@
    bool push_back(const Data d)
    {
        Guard<Mutex> guard(mutex_);
        auto old = mtail();
        auto pos = Pos(old);
        auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
@@ -257,7 +220,6 @@
    }
    bool pop_front(Data &d)
    {
        Guard<Mutex> guard(mutex_);
        auto old = mhead();
        auto pos = Pos(old);
        if (!(pos == tail())) {
@@ -281,7 +243,6 @@
    meta_type mtail() const { return mtail_.load(); }
    // data
    const size_type capacity_;
    Mutex mutex_;
    std::atomic<meta_type> mhead_;
    std::atomic<meta_type> mtail_;
    Alloc al_;
src/shm.h
@@ -19,7 +19,7 @@
#ifndef SHM_6CHO6D6C
#define SHM_6CHO6D6C
#include "robust.h"
#include "log.h"
#include <atomic>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
@@ -90,8 +90,9 @@
    }
};
typedef robust::Mutex Mutex;
typedef robust::Guard<Mutex> Guard;
typedef interprocess_mutex Mutex;
typedef scoped_lock<Mutex> Guard;
// typedef robust::Guard<Mutex> Guard;
class SharedMemory : public mshm_t
{
src/shm_msg_queue.cpp
@@ -39,13 +39,13 @@
ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
    id_(id),
    queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
    queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager())
{
}
ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) :
    id_(id),
    queue_(segment, create_or_else_find, MsgQIdToName(id_))
    queue_(segment, create_or_else_find, MsgQIdToName(id_), len, segment.get_segment_manager())
{
    if (!queue_.IsOk()) {
        throw("error create/find msgq " + std::to_string(id_));
@@ -56,6 +56,18 @@
ShmMsgQueue::~ShmMsgQueue() {}
ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id)
{
    static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm;
    static std::mutex mtx;
    std::lock_guard<std::mutex> lock(mtx);
    auto pos = imm.find(id);
    if (pos == imm.end()) {
        pos = imm.emplace(id, new Mutex(id)).first;
    }
    return *pos->second;
}
bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
{
    Queue *q = Find(shm, id);
@@ -75,14 +87,15 @@
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg)
{
    Queue *remote = Find(shm, remote_id);
    bool r = false;
    if (remote) {
    try {
        ShmMsgQueue dest(remote_id, false, shm, 1);
        msg.AddRef();
        r = remote->TryWrite(msg.Offset());
        if (!r) {
            msg.Release();
        }
        DEFER1(if (!r) { msg.Release(); });
        Guard lock(GetMutex(remote_id));
        r = dest.queue().TryWrite(msg.Offset());
    } catch (...) {
    }
    return r;
}
src/shm_msg_queue.h
@@ -26,11 +26,13 @@
class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
{
    typedef ShmObject<SharedQ63<4>> Shmq;
    // typedef ShmObject<SharedQueue<int64_t>> Shmq;
    // typedef ShmObject<SharedQ63<4>> Shmq;
    typedef ShmObject<SharedQueue<int64_t>> Shmq;
    typedef Shmq::ShmType ShmType;
    typedef Shmq::Data Queue;
    typedef std::function<void()> OnSend;
    typedef robust::FMutex Mutex;
    typedef robust::Guard<Mutex> Guard;
public:
    typedef uint64_t MQId;
@@ -45,13 +47,22 @@
    MQId Id() const { return id_; }
    ShmType &shm() const { return queue_.shm(); }
    bool Recv(MsgI &msg, const int timeout_ms) { return queue().Read(msg.OffsetRef(), timeout_ms); }
    bool TryRecv(MsgI &msg) { return queue().TryRead(msg.OffsetRef()); }
    bool Recv(MsgI &msg, const int timeout_ms)
    {
        Guard lock(GetMutex(Id()));
        return queue().Read(msg.OffsetRef(), timeout_ms);
    }
    bool TryRecv(MsgI &msg)
    {
        Guard lock(GetMutex(Id()));
        return queue().TryRead(msg.OffsetRef());
    }
    static Queue *Find(SharedMemory &shm, const MQId remote_id);
    static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg);
    bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
private:
    static Mutex &GetMutex(const MQId id);
    MQId id_;
    Queue &queue() { return *queue_.data(); }
    Shmq queue_;
src/shm_queue.h
@@ -19,15 +19,18 @@
#ifndef SHM_QUEUE_JE0OEUP3
#define SHM_QUEUE_JE0OEUP3
#include "robust.h"
#include "shm.h"
#include <atomic>
#include <boost/circular_buffer.hpp>
#include <chrono>
namespace bhome_shm
{
template <class D>
using Circular = robust::CircularBuffer<D, Allocator<D>>;
using Circular = boost::circular_buffer<D, Allocator<D>>;
// using Circular = robust::CircularBuffer<D, Allocator<D>>;
template <class D>
class SharedQueue
@@ -49,8 +52,25 @@
        } while (steady_clock::now() < end_time);
        return false;
    }
    bool TryRead(D &d) { return queue_.pop_front(d); }
    bool TryWrite(const D &d) { return queue_.push_back(d); }
    bool TryRead(D &d)
    {
        if (!queue_.empty()) {
            d = queue_.front();
            queue_.pop_front();
            return true;
        } else {
            return false;
        }
    }
    bool TryWrite(const D &d)
    {
        if (!queue_.full()) {
            queue_.push_back(d);
            return true;
        } else {
            return false;
        }
    }
private:
    Circular<D> queue_;
utest/robust_test.cpp
@@ -1,5 +1,6 @@
#include "robust.h"
#include "util.h"
#include <boost/circular_buffer.hpp>
using namespace robust;
@@ -18,7 +19,7 @@
BOOST_AUTO_TEST_CASE(QueueTest)
{
    const int nthread = 100;
    const uint64_t nmsg = 1000 * 1000 * 100;
    const uint64_t nmsg = 1000 * 1000 * 10;
    SharedMemory &shm = TestShm();
    shm.Remove();
@@ -33,7 +34,12 @@
        BOOST_CHECK_EQUAL((u64 & 255), i);
    }
#if 1
    uint64_t correct_total = nmsg * (nmsg - 1) / 2;
    std::atomic<uint64_t> total(0);
    std::atomic<uint64_t> nwrite(0);
    std::atomic<uint64_t> writedone(0);
#if 0
    typedef AtomicQueue<4> Rcb;
    Rcb tmp;
@@ -48,18 +54,8 @@
    BOOST_CHECK(tmp.tail() == 1);
    ShmObject<Rcb> rcb(shm, "test_rcb");
#else
    typedef Circular<int64_t> Rcb;
    ShmObject<Rcb> rcb(shm, "test_rcb", 64, shm.get_segment_manager());
#endif
    bool try_more = true;
    const int nsize = sizeof(Rcb);
    bool try_more = false;
    uint64_t correct_total = nmsg * (nmsg - 1) / 2;
    std::atomic<uint64_t> total(0);
    std::atomic<uint64_t> nwrite(0);
    std::atomic<uint64_t> writedone(0);
    auto Writer = [&]() {
        uint64_t n = 0;
        while ((n = nwrite++) < nmsg) {
@@ -82,6 +78,58 @@
        }
    };
#else
    typedef Circular<int64_t> Rcb;
    ShmObject<Rcb> rcb(shm, "test_rcb", 16, shm.get_segment_manager());
    typedef FMutex Mutex;
    // typedef SemMutex Mutex;
    Mutex mtx(123);
    auto Writer = [&]() {
        uint64_t n = 0;
        while ((n = nwrite++) < nmsg) {
            auto Write = [&]() {
                robust::Guard<Mutex> lk(mtx);
                if (rcb->full()) {
                    return false;
                } else {
                    rcb->push_back(n);
                    return true;
                }
                // return rcb->push_back(n);
            };
            while (!Write()) {
                // MySleep();
            }
            ++writedone;
        }
    };
    std::atomic<uint64_t> nread(0);
    auto Reader = [&]() {
        while (nread.load() < nmsg) {
            int64_t d;
            auto Read = [&]() {
                robust::Guard<Mutex> lk(mtx);
                if (rcb->empty()) {
                    return false;
                } else {
                    d = rcb->front();
                    rcb->pop_front();
                    return true;
                }
                // return rcb->pop_front(d);
            };
            if (Read()) {
                ++nread;
                total += d;
            } else {
                // MySleep();
            }
        }
    };
#endif
    auto status = [&]() {
        auto next = steady_clock::now();
        uint32_t lw = 0;
@@ -102,7 +150,8 @@
    {
        ThreadManager threads;
        boost::timer::auto_cpu_timer timer;
        printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread);
        // printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread);
        printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, 16, nthread);
        for (int i = 0; i < nthread; ++i) {
            threads.Launch(Reader);
            threads.Launch(Writer);
@@ -116,7 +165,8 @@
BOOST_AUTO_TEST_CASE(MutexTest)
{
    typedef robust::Mutex RobustMutex;
    // typedef robust::MFMutex RobustMutex;
    typedef robust::SemMutex RobustMutex;
    for (int i = 0; i < 20; ++i) {
        int size = i;
@@ -131,7 +181,9 @@
    const std::string mtx_name("test_mutex");
    const std::string int_name("test_int");
    auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
    // auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name, 12345);
    RobustMutex rmtx(12345);
    auto mtx = &rmtx;
    auto pi = shm.FindOrCreate<int>(int_name, 100);
    std::mutex m;
@@ -142,29 +194,48 @@
        printf("int : %d, add1: %d\n", old, ++*pi);
    }
    {
        const int ntimes = 1000 * 1000;
        RobustMutex mutex;
    auto LockSpeed = [](auto &mutex, const std::string &name) {
        const int ntimes = 1000 * 1;
        auto Lock = [&]() {
            for (int i = 0; i < ntimes; ++i) {
                mutex.lock();
                mutex.unlock();
            }
        };
        printf("\nTesting %s lock/unlock %d times\n", name.c_str(), ntimes);
        {
            boost::timer::auto_cpu_timer timer;
            printf("test lock/unlock %d times: ", ntimes);
            printf("1 thread: ");
            Lock();
        }
        {
        auto InThread = [&](int nthread) {
            boost::timer::auto_cpu_timer timer;
            printf("test lock/unlock %d times, 2 thread: ", ntimes);
            std::thread t1(Lock), t2(Lock);
            t1.join();
            t2.join();
        }
    }
            printf("%d threads: ", nthread);
            std::vector<std::thread> vt;
            for (int i = 0; i < nthread; ++i) {
                vt.emplace_back(Lock);
            }
            for (auto &t : vt) {
                t.join();
            }
        };
        InThread(4);
        InThread(16);
        InThread(100);
        InThread(1000);
    };
    NullMutex null_mtx;
    std::mutex std_mtx;
    CasMutex cas_mtx;
    FMutex mfmtx(3);
    boost::interprocess::interprocess_mutex ipc_mutex;
    SemMutex sem_mtx(3);
    LockSpeed(null_mtx, "null mutex");
    LockSpeed(std_mtx, "std::mutex");
    // LockSpeed(cas_mtx, "CAS mutex");
    LockSpeed(ipc_mutex, "boost ipc mutex");
    LockSpeed(mfmtx, "mutex+flock");
    LockSpeed(sem_mtx, "sem mutex");
    auto TryLock = [&]() {
        if (mtx->try_lock()) {
@@ -183,6 +254,7 @@
    if (mtx) {
        printf("mtx exists\n");
        if (TryLock()) {
            // Sleep(10s);
            auto op = [&]() {
                if (TryLock()) {
                    Unlock();