lichao
2021-04-30 d33a69463f1a75134d01191be0b9e1bdd757dd4b
add atomic queue, no lock, unorder.
2个文件已修改
242 ■■■■■ 已修改文件
src/robust.h 120 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/robust_test.cpp 122 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.h
@@ -107,6 +107,7 @@
{
public:
    typedef int locker_t;
    enum { eLockerBits = sizeof(locker_t) * 8 };
    static locker_t this_locker()
    {
        static locker_t val = getpid();
@@ -119,6 +120,7 @@
{
public:
    typedef int locker_t;
    enum { eLockerBits = sizeof(locker_t) * 8 };
    static locker_t this_locker()
    {
        static locker_t val = getpid();
@@ -132,12 +134,29 @@
    }
};
class ExpiredLocker
{
public:
    typedef int64_t locker_t;
    enum { eLockerBits = 63 };
    static locker_t this_locker() { return Now(); }
    static bool is_alive(locker_t locker)
    {
        return Now() < locker + steady_clock::duration(10s).count();
    }
private:
    static locker_t Now() { return steady_clock::now().time_since_epoch().count(); }
};
template <class LockerT>
class CasMutex
{
    typedef typename LockerT::locker_t locker_t;
    static inline locker_t this_locker() { return LockerT::this_locker(); }
    static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
    static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits);
    static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!");
public:
    CasMutex() :
@@ -152,7 +171,7 @@
        } else if (!is_alive(Locker(old))) {
            r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
            if (r) {
                printf("captured pid %d -> %d, r = %d\n", Locker(old), this_locker(), r);
                printf("captured locker %ld -> %ld, locker = %d\n", int64_t(Locker(old)), int64_t(this_locker()), r);
            }
        }
        return r;
@@ -174,10 +193,9 @@
    }
private:
    static_assert(sizeof(locker_t) < sizeof(uint64_t), "locker size must be smaller than 64 bit!");
    std::atomic<uint64_t> meta_;
    bool Locked(uint64_t meta) { return (meta >> 63) != 0; }
    locker_t Locker(uint64_t meta) { return meta & MaskBits(sizeof(locker_t) * 8); }
    bool Locked(uint64_t meta) { return (meta >> 63) == 1; }
    locker_t Locker(uint64_t meta) { return meta & kLockerMask; }
    uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; }
    bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
};
@@ -206,7 +224,7 @@
    typedef uint64_t meta_type;
    static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
    static count_type Count(meta_type meta) { return meta >> 32; }
    static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
    static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
public:
    typedef D Data;
@@ -214,41 +232,40 @@
    CircularBuffer(const size_type cap) :
        CircularBuffer(cap, Alloc()) {}
    CircularBuffer(const size_type cap, Alloc const &al) :
        state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap))
        capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_))
    {
        if (!buf) {
            throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
        } else {
            memset(&buf[0], 0, sizeof(D) * capacity_);
        }
    }
    ~CircularBuffer() { al_.deallocate(buf, capacity_); }
    size_type size() const { return (capacity_ + tail() - head()) % capacity_; }
    bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; }
    bool empty() const { return head() == tail(); }
    bool push_back(Data d)
    bool push_back(const Data d)
    {
        Guard<Mutex> guard(mutex_);
        if (!full()) {
            auto old = mtail();
            buf[Pos(old)] = d;
        auto old = mtail();
        auto pos = Pos(old);
        auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
        if (!full) {
            buf[pos] = d;
            return mtail_.compare_exchange_strong(old, next(old));
        } else {
            return false;
        }
        return false;
    }
    bool pop_front(Data &d)
    {
        Guard<Mutex> guard(mutex_);
        if (!empty()) {
            auto old = mhead();
            d = buf[Pos(old)];
        auto old = mhead();
        auto pos = Pos(old);
        if (!(pos == tail())) {
            d = buf[pos];
            return mhead_.compare_exchange_strong(old, next(old));
        } else {
            return false;
        }
    }
    bool Ready() const { return state_.load() == eStateReady; }
    void PutReady() { state_.store(eStateReady); }
private:
    CircularBuffer(const CircularBuffer &);
@@ -262,8 +279,6 @@
    meta_type mhead() const { return mhead_.load(); }
    meta_type mtail() const { return mtail_.load(); }
    // data
    enum { eStateReady = 0x19833891 };
    std::atomic<uint32_t> state_;
    const size_type capacity_;
    Mutex mutex_;
    std::atomic<meta_type> mhead_;
@@ -272,5 +287,66 @@
    typename Alloc::pointer buf = nullptr;
};
template <unsigned PowerSize = 4, class Int = int64_t>
class AtomicQueue
{
public:
    typedef uint32_t size_type;
    typedef Int Data;
    typedef std::atomic<Data> AData;
    static_assert(sizeof(Data) == sizeof(AData));
    enum {
        power = PowerSize,
        capacity = (1 << power),
        mask = capacity - 1,
    };
    AtomicQueue() { memset(this, 0, sizeof(*this)); }
    size_type head() const { return head_.load(); }
    size_type tail() const { return tail_.load(); }
    bool like_empty() const { return head() == tail() && Empty(buf[head()]); }
    bool like_full() const { return head() == tail() && !Empty(buf[head()]); }
    bool push_back(const Data d, bool try_more = false)
    {
        bool r = false;
        size_type i = 0;
        do {
            auto pos = tail();
            auto cur = buf[pos].load();
            r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d));
            tail_.compare_exchange_strong(pos, Next(pos));
        } while (try_more && !r && ++i < capacity);
        return r;
    }
    bool pop_front(Data &d, bool try_more = false)
    {
        bool r = false;
        Data cur;
        size_type i = 0;
        do {
            auto pos = head();
            cur = buf[pos].load();
            r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0);
            head_.compare_exchange_strong(pos, Next(pos));
        } while (try_more && !r && ++i < capacity);
        if (r) { d = Dec(cur); }
        return r;
    }
private:
    static_assert(std::is_integral<Data>::value, "Data must be integral type!");
    static_assert(std::is_signed<Data>::value, "Data must be signed type!");
    static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!");
    static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
    static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
    static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
    static size_type Next(const size_type index) { return (index + 1) & mask; }
    std::atomic<size_type> head_;
    std::atomic<size_type> tail_;
    AData buf[capacity];
};
} // namespace robust
#endif // end of include guard: ROBUST_Q31RCWYU
utest/robust_test.cpp
@@ -8,52 +8,54 @@
    eLockerMask = MaskBits(sizeof(int) * 8),
};
typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb;
Rcb *GetRCB(SharedMemory &shm, const int nelem)
{
    int cap = nelem + 1;
    typedef uint64_t Data;
    auto size = sizeof(Rcb) + sizeof(Data) * cap;
    void *p = shm.Alloc(size);
    if (p) {
        return new (p) Rcb(cap, shm.get_segment_manager());
    }
    return nullptr;
}
void MySleep()
{
    std::this_thread::sleep_for(2us);
}
/////////////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(QueueTest)
{
    const int nthread = 100;
    const uint64_t nmsg = 1000 * 1000 * 10;
    SharedMemory &shm = TestShm();
    shm.Remove();
    pid_t pid = getpid();
    printf("pid : %d\n", pid);
    auto Access = [](pid_t pid) {
        char buf[100] = {0};
        sprintf(buf, "/proc/%d/stat", pid);
        int r = access(buf, F_OK);
        printf("access %d\n", r);
    };
    Access(pid);
    Access(pid + 1);
    // Sleep(10s);
    // return;
    // return; /////////////////////////////////////////////////
    int64_t i64 = 0;
    char c = 0;
    for (int i = 0; i < 256; ++i) {
        c = i;
        i64 = int64_t(c) << 1;
        BOOST_CHECK_EQUAL(c, (i64 >> 1));
        uint64_t u64 = i;
        BOOST_CHECK_EQUAL((u64 & 255), i);
    }
    int nelement = 640;
    auto rcb = GetRCB(shm, nelement);
    BOOST_CHECK(rcb != nullptr);
    BOOST_CHECK(rcb->empty());
    BOOST_CHECK(rcb->push_back(1));
    BOOST_CHECK(rcb->size() == 1);
#if 1
    typedef AtomicQueue<3> Rcb;
    Rcb tmp;
    BOOST_CHECK(tmp.like_empty());
    BOOST_CHECK(tmp.push_back(1));
    BOOST_CHECK(tmp.tail() == 1);
    BOOST_CHECK(tmp.head() == 0);
    int64_t d;
    BOOST_CHECK(rcb->pop_front(d));
    BOOST_CHECK(rcb->empty());
    BOOST_CHECK(tmp.pop_front(d));
    BOOST_CHECK(tmp.like_empty());
    BOOST_CHECK(tmp.head() == 1);
    BOOST_CHECK(tmp.tail() == 1);
    const uint64_t nmsg = 1000 * 1000 * 1;
    ShmObject<Rcb> rcb(shm, "test_rcb");
#else
    typedef Circular<int64_t> Rcb;
    ShmObject<Rcb> rcb(shm, "test_rcb", 64, shm.get_segment_manager());
#endif
    const int nsize = sizeof(Rcb);
    bool try_more = false;
    uint64_t correct_total = nmsg * (nmsg - 1) / 2;
    std::atomic<uint64_t> total(0);
    std::atomic<uint64_t> nwrite(0);
@@ -61,7 +63,7 @@
    auto Writer = [&]() {
        uint64_t n = 0;
        while ((n = nwrite++) < nmsg) {
            while (!rcb->push_back(n)) {
            while (!rcb->push_back(n, try_more)) {
                // MySleep();
            }
            ++writedone;
@@ -71,11 +73,11 @@
    auto Reader = [&]() {
        while (nread.load() < nmsg) {
            int64_t d;
            if (rcb->pop_front(d)) {
            if (rcb->pop_front(d, try_more)) {
                ++nread;
                total += d;
            } else {
                MySleep();
                // MySleep();
            }
        }
    };
@@ -89,21 +91,25 @@
            next += 1s;
            auto w = writedone.load();
            auto r = nread.load();
            printf("write: %6ld, spd: %6ld,  read: %6ld, spd: %6ld , queue size: %d\n", w, w - lw, r, r - lr, rcb->size());
            printf("write: %6ld, spd: %6ld,  read: %6ld, spd: %6ld\n",
                   w, w - lw, r, r - lr);
            lw = w;
            lr = r;
        } while (nread.load() < nmsg);
    };
    ThreadManager threads;
    boost::timer::auto_cpu_timer timer;
    printf("Testing Robust Buffer, msgs %ld, queue size: %d \n", nmsg, nelement);
    threads.Launch(status);
    for (int i = 0; i < 10; ++i) {
        threads.Launch(Reader);
        threads.Launch(Writer);
    std::thread st(status);
    {
        ThreadManager threads;
        boost::timer::auto_cpu_timer timer;
        printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread);
        for (int i = 0; i < nthread; ++i) {
            threads.Launch(Reader);
            threads.Launch(Writer);
        }
        threads.WaitAll();
    }
    threads.WaitAll();
    st.join();
    printf("total: %ld, expected: %ld\n", total.load(), correct_total);
    BOOST_CHECK_EQUAL(total.load(), correct_total);
}
@@ -137,9 +143,7 @@
    }
    {
        boost::timer::auto_cpu_timer timer;
        const int ntimes = 1000 * 1000;
        printf("test lock/unlock %d times: ", ntimes);
        RobustMutex mutex;
        auto Lock = [&]() {
            for (int i = 0; i < ntimes; ++i) {
@@ -147,16 +151,20 @@
                mutex.unlock();
            }
        };
        std::thread t1(Lock), t2(Lock);
        t1.join();
        t2.join();
    }
    auto MSFromNow = [](const int ms) {
        using namespace boost::posix_time;
        ptime cur = boost::posix_time::microsec_clock::universal_time();
        return cur + millisec(ms);
    };
        {
            boost::timer::auto_cpu_timer timer;
            printf("test lock/unlock %d times: ", ntimes);
            Lock();
        }
        {
            boost::timer::auto_cpu_timer timer;
            printf("test lock/unlock %d times, 2 thread: ", ntimes);
            std::thread t1(Lock), t2(Lock);
            t1.join();
            t2.join();
        }
    }
    auto TryLock = [&]() {
        if (mtx->try_lock()) {