lichao
2021-05-10 77a6c3512a44dfe6540dde71946e6484fe4f173f
test lock code.
7个文件已修改
119 ■■■■ 已修改文件
src/robust.cpp 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.h 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.cpp 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/robust_test.cpp 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 65 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.cpp
@@ -35,25 +35,24 @@
bool FMutex::try_lock()
{
    if (flock(fd_, LOCK_EX | LOCK_NB) == 0) {
        if (mtx_.try_lock()) {
    if (mtx_.try_lock()) {
        if (flock(fd_, LOCK_EX | LOCK_NB) == 0) {
            return true;
        } else {
            flock(fd_, LOCK_UN);
            mtx_.unlock();
        }
    }
    return false;
}
void FMutex::lock()
{
    //Note: the lock order affects performance a lot,
    // locking fd_ first is about 100 times faster than locking mtx_ first.
    flock(fd_, LOCK_EX);
    mtx_.lock();
    flock(fd_, LOCK_EX);
}
void FMutex::unlock()
{
    mtx_.unlock();
    flock(fd_, LOCK_UN);
    mtx_.unlock();
}
} // namespace robust
src/robust.h
@@ -86,6 +86,8 @@
class NullMutex
{
public:
    template <class... T>
    explicit NullMutex(T &&...t) {} // easy test.
    bool try_lock() { return true; }
    void lock() {}
    void unlock() {}
@@ -113,8 +115,10 @@
        mkdir(dir.c_str(), 0777);
        return dir + "/fm_" + std::to_string(id);
    }
    static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDWR, 0666); }
    static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); }
    static int Close(int fd) { return close(fd); }
    void FLock();
    void FUnlock();
    id_t id_;
    int fd_;
    std::mutex mtx_;
@@ -132,17 +136,11 @@
{
public:
    SemMutex(key_t key) :
        key_(key), sem_id_(semget(key, 1, 0666 | IPC_CREAT))
        key_(key), sem_id_(semget(key, 1, 0666))
    {
        if (sem_id_ == -1) { throw "error create semaphore."; }
        union semun init_val;
        init_val.val = 1;
        semctl(sem_id_, 0, SETVAL, init_val);
    }
    ~SemMutex()
    {
        // semctl(sem_id_, 0, IPC_RMID, semun{});
    }
    ~SemMutex() {}
    bool try_lock()
    {
src/shm_msg_queue.cpp
@@ -65,6 +65,7 @@
    auto pos = imm.find(id);
    if (pos == imm.end()) {
        pos = imm.emplace(id, new Mutex(id)).first;
        // pos = imm.emplace(id, new Mutex()).first;
    }
    return *pos->second;
}
src/shm_msg_queue.h
@@ -32,6 +32,8 @@
    typedef Shmq::Data Queue;
    typedef std::function<void()> OnSend;
    typedef robust::FMutex Mutex;
    // typedef robust::SemMutex Mutex;
    // typedef robust::NullMutex Mutex;
    typedef robust::Guard<Mutex> Guard;
public:
src/shm_queue.h
@@ -54,6 +54,7 @@
    }
    bool TryRead(D &d)
    {
        // bhome_shm::Guard lock(mutex_);
        if (!queue_.empty()) {
            d = queue_.front();
            queue_.pop_front();
@@ -64,6 +65,7 @@
    }
    bool TryWrite(const D &d)
    {
        // bhome_shm::Guard lock(mutex_);
        if (!queue_.full()) {
            queue_.push_back(d);
            return true;
@@ -74,12 +76,16 @@
private:
    Circular<D> queue_;
    bhome_shm::Mutex mutex_;
};
template <int Power = 4>
class SharedQ63
{
public:
    template <class... T>
    explicit SharedQ63(T &&...t) {} // easy testing
    typedef int64_t Data;
    bool Read(Data &d, const int timeout_ms)
    {
utest/robust_test.cpp
@@ -165,6 +165,22 @@
BOOST_AUTO_TEST_CASE(MutexTest)
{
    {
        int fd = open("/tmp/test_fmutex", O_CREAT | O_RDONLY, 0666);
        flock(fd, LOCK_EX);
        printf("lock 1");
        Sleep(10s);
        flock(fd, LOCK_EX);
        printf("lock 2");
        Sleep(10s);
        flock(fd, LOCK_UN);
        printf("un lock 2");
        Sleep(10s);
        flock(fd, LOCK_UN);
        printf("un lock 1");
        return;
    }
    // typedef robust::MFMutex RobustMutex;
    typedef robust::SemMutex RobustMutex;
utest/speed_test.cpp
@@ -15,22 +15,38 @@
 *
 * =====================================================================================
 */
#include "robust.h"
#include "util.h"
using namespace robust;
BOOST_AUTO_TEST_CASE(SpeedTest)
{
    const int mem_size = 1024 * 1024 * 50;
    SharedMemory &shm = TestShm();
    GlobalInit(shm);
    auto InitSem = [](auto id) {
        auto sem_id = semget(id, 1, 0666 | IPC_CREAT);
        union semun init_val;
        init_val.val = 1;
        semctl(sem_id, 0, SETVAL, init_val);
        return;
    };
    MQId id = ShmMsgQueue::NewId();
    const int timeout = 1000;
    const uint32_t data_size = 4000;
    const std::string proc_id = "demo_proc";
    InitSem(id);
    const int timeout = 1000;
    const uint32_t data_size = 1001;
    const std::string proc_id = "demo_proc";
    std::atomic<int64_t> nwrite(0);
    std::atomic<int64_t> nread(0);
    std::string str(data_size, 'a');
    auto Writer = [&](int writer_id, uint64_t n) {
        ShmMsgQueue mq(shm, 64);
        std::string str(data_size, 'a');
        MQId cli_id = ShmMsgQueue::NewId();
        InitSem(cli_id);
        ShmMsgQueue mq(cli_id, shm, 64);
        MsgI msg;
        MsgRequestTopic body;
        body.set_topic("topic");
@@ -42,6 +58,7 @@
        for (uint64_t i = 0; i < n; ++i) {
            while (!mq.TrySend(id, msg)) {}
            ++nwrite;
        }
    };
    auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
@@ -54,6 +71,7 @@
            if (mq.TryRecv(msg)) {
                DEFER1(msg.Release());
                tm = now();
                ++nread;
            } else if (isfork) {
                if (now() > tm + 1s) {
                    exit(0); // for forked quit after 1s.
@@ -64,21 +82,24 @@
    auto State = [&](std::atomic<bool> *run) {
        auto init = shm.get_free_memory();
        printf("shm init : %ld\n", init);
        uint64_t last_read = 0;
        while (*run) {
            auto cur = shm.get_free_memory();
            printf("shm used : %8ld/%ld\n", init - cur, init);
            auto cur_read = nread.load();
            printf("shm used : %8ld/%ld, write: %8ld, read: %8ld, speed: %8ld\n", init - cur, init, nwrite.load(), cur_read, cur_read - last_read);
            last_read = cur_read;
            std::this_thread::sleep_for(1s);
        }
    };
    int nwriters[] = {1, 4, 16};
    int nreaders[] = {1, 4};
    int nwriters[] = {1, 10, 100};
    int nreaders[] = {2};
    const int64_t total_msg = 1000 * 100;
    auto Test = [&](auto &www, auto &rrr, bool isfork) {
        for (auto nreader : nreaders) {
            for (auto nwriter : nwriters) {
                const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
                const uint64_t total_msg = nmsg * nwriter;
                const uint64_t nmsg = total_msg / nwriter;
                std::atomic<bool> run(true);
                std::this_thread::sleep_for(10ms);
                boost::timer::auto_cpu_timer timer;
@@ -100,16 +121,22 @@
    std::atomic<bool> run(true);
    ThreadManager state;
    state.Launch(State, &run);
    DEFER1(run.store(false););
    // typedef ProcessManager Manager;
    // typedef ThreadManager Manager;
    // const bool isfork = IsSameType<Manager, ProcessManager>::value;
    ProcessManager pw, pr;
    printf("================ Testing process io: =======================================================\n");
    Test(pw, pr, true);
    ThreadManager tw, tr;
    printf("---------------- Testing thread io:  -------------------------------------------------------\n");
    Test(tw, tr, false);
    run.store(false);
    {
        ThreadManager tw, tr;
        printf("---------------- Testing thread io:  -------------------------------------------------------\n");
        Test(tw, tr, false);
    }
    {
        ProcessManager pw, pr;
        printf("================ Testing process io: =======================================================\n");
        Test(pw, pr, true);
    }
}
// Send Recv Test
@@ -123,6 +150,8 @@
    const std::string server_proc_id = "server_proc";
    SharedMemory &shm = TestShm();
    // shm.Remove();
    // return;
    GlobalInit(shm);
    auto Avail = [&]() { return shm.get_free_memory(); };