From d33a69463f1a75134d01191be0b9e1bdd757dd4b Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 30 四月 2021 15:27:59 +0800 Subject: [PATCH] add atomic queue, no lock, unorder. --- src/robust.h | 120 +++++++++++++++++++---- utest/robust_test.cpp | 122 +++++++++++++----------- 2 files changed, 163 insertions(+), 79 deletions(-) diff --git a/src/robust.h b/src/robust.h index 983567f..b7459ad 100644 --- a/src/robust.h +++ b/src/robust.h @@ -107,6 +107,7 @@ { public: typedef int locker_t; + enum { eLockerBits = sizeof(locker_t) * 8 }; static locker_t this_locker() { static locker_t val = getpid(); @@ -119,6 +120,7 @@ { public: typedef int locker_t; + enum { eLockerBits = sizeof(locker_t) * 8 }; static locker_t this_locker() { static locker_t val = getpid(); @@ -132,12 +134,29 @@ } }; +class ExpiredLocker +{ +public: + typedef int64_t locker_t; + enum { eLockerBits = 63 }; + static locker_t this_locker() { return Now(); } + static bool is_alive(locker_t locker) + { + return Now() < locker + steady_clock::duration(10s).count(); + } + +private: + static locker_t Now() { return steady_clock::now().time_since_epoch().count(); } +}; + template <class LockerT> class CasMutex { typedef typename LockerT::locker_t locker_t; static inline locker_t this_locker() { return LockerT::this_locker(); } static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); } + static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits); + static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!"); public: CasMutex() : @@ -152,7 +171,7 @@ } else if (!is_alive(Locker(old))) { r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1; if (r) { - printf("captured pid %d -> %d, r = %d\n", Locker(old), this_locker(), r); + printf("captured locker %ld -> %ld, locker = %d\n", int64_t(Locker(old)), int64_t(this_locker()), r); } } return r; @@ -174,10 +193,9 @@ } private: - static_assert(sizeof(locker_t) < sizeof(uint64_t), "locker size must be smaller than 64 bit!"); std::atomic<uint64_t> meta_; - bool Locked(uint64_t meta) { return (meta >> 63) != 0; } - locker_t Locker(uint64_t meta) { return meta & MaskBits(sizeof(locker_t) * 8); } + bool Locked(uint64_t meta) { return (meta >> 63) == 1; } + locker_t Locker(uint64_t meta) { return meta & kLockerMask; } uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; } bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } }; @@ -206,7 +224,7 @@ typedef uint64_t meta_type; static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; } static count_type Count(meta_type meta) { return meta >> 32; } - static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } + static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } public: typedef D Data; @@ -214,41 +232,40 @@ CircularBuffer(const size_type cap) : CircularBuffer(cap, Alloc()) {} CircularBuffer(const size_type cap, Alloc const &al) : - state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap)) + capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_)) { if (!buf) { throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!"); + } else { + memset(&buf[0], 0, sizeof(D) * capacity_); } } ~CircularBuffer() { al_.deallocate(buf, capacity_); } - size_type size() const { return (capacity_ + tail() - head()) % capacity_; } - bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; } - bool empty() const { return head() == tail(); } - bool push_back(Data d) + bool push_back(const Data d) { Guard<Mutex> guard(mutex_); - if (!full()) { - auto old = mtail(); - buf[Pos(old)] = d; + auto old = mtail(); + auto pos = Pos(old); + auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0); + if (!full) { + buf[pos] = d; return mtail_.compare_exchange_strong(old, next(old)); - } else { - return false; } + return false; } bool pop_front(Data &d) { Guard<Mutex> guard(mutex_); - if (!empty()) { - auto old = mhead(); - d = buf[Pos(old)]; + auto old = mhead(); + auto pos = Pos(old); + if (!(pos == tail())) { + d = buf[pos]; return mhead_.compare_exchange_strong(old, next(old)); } else { return false; } } - bool Ready() const { return state_.load() == eStateReady; } - void PutReady() { state_.store(eStateReady); } private: CircularBuffer(const CircularBuffer &); @@ -262,8 +279,6 @@ meta_type mhead() const { return mhead_.load(); } meta_type mtail() const { return mtail_.load(); } // data - enum { eStateReady = 0x19833891 }; - std::atomic<uint32_t> state_; const size_type capacity_; Mutex mutex_; std::atomic<meta_type> mhead_; @@ -272,5 +287,66 @@ typename Alloc::pointer buf = nullptr; }; +template <unsigned PowerSize = 4, class Int = int64_t> +class AtomicQueue +{ +public: + typedef uint32_t size_type; + typedef Int Data; + typedef std::atomic<Data> AData; + static_assert(sizeof(Data) == sizeof(AData)); + enum { + power = PowerSize, + capacity = (1 << power), + mask = capacity - 1, + }; + + AtomicQueue() { memset(this, 0, sizeof(*this)); } + size_type head() const { return head_.load(); } + size_type tail() const { return tail_.load(); } + bool like_empty() const { return head() == tail() && Empty(buf[head()]); } + bool like_full() const { return head() == tail() && !Empty(buf[head()]); } + bool push_back(const Data d, bool try_more = false) + { + bool r = false; + size_type i = 0; + do { + auto pos = tail(); + auto cur = buf[pos].load(); + r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d)); + tail_.compare_exchange_strong(pos, Next(pos)); + } while (try_more && !r && ++i < capacity); + return r; + } + bool pop_front(Data &d, bool try_more = false) + { + bool r = false; + Data cur; + size_type i = 0; + do { + auto pos = head(); + cur = buf[pos].load(); + r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0); + head_.compare_exchange_strong(pos, Next(pos)); + } while (try_more && !r && ++i < capacity); + if (r) { d = Dec(cur); } + return r; + } + +private: + static_assert(std::is_integral<Data>::value, "Data must be integral type!"); + static_assert(std::is_signed<Data>::value, "Data must be signed type!"); + static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!"); + + static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. + static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. + static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. + static size_type Next(const size_type index) { return (index + 1) & mask; } + + std::atomic<size_type> head_; + std::atomic<size_type> tail_; + AData buf[capacity]; +}; + } // namespace robust #endif // end of include guard: ROBUST_Q31RCWYU diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp index 9384c10..7799aad 100644 --- a/utest/robust_test.cpp +++ b/utest/robust_test.cpp @@ -8,52 +8,54 @@ eLockerMask = MaskBits(sizeof(int) * 8), }; -typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb; -Rcb *GetRCB(SharedMemory &shm, const int nelem) -{ - int cap = nelem + 1; - typedef uint64_t Data; - auto size = sizeof(Rcb) + sizeof(Data) * cap; - void *p = shm.Alloc(size); - if (p) { - return new (p) Rcb(cap, shm.get_segment_manager()); - } - return nullptr; -} - void MySleep() { std::this_thread::sleep_for(2us); } +///////////////////////////////////////////////////////////////////////////////////////// + BOOST_AUTO_TEST_CASE(QueueTest) { + const int nthread = 100; + const uint64_t nmsg = 1000 * 1000 * 10; + SharedMemory &shm = TestShm(); shm.Remove(); - pid_t pid = getpid(); - printf("pid : %d\n", pid); - auto Access = [](pid_t pid) { - char buf[100] = {0}; - sprintf(buf, "/proc/%d/stat", pid); - int r = access(buf, F_OK); - printf("access %d\n", r); - }; - Access(pid); - Access(pid + 1); - // Sleep(10s); - // return; + // return; ///////////////////////////////////////////////// + int64_t i64 = 0; + char c = 0; + for (int i = 0; i < 256; ++i) { + c = i; + i64 = int64_t(c) << 1; + BOOST_CHECK_EQUAL(c, (i64 >> 1)); + uint64_t u64 = i; + BOOST_CHECK_EQUAL((u64 & 255), i); + } - int nelement = 640; - auto rcb = GetRCB(shm, nelement); - BOOST_CHECK(rcb != nullptr); - BOOST_CHECK(rcb->empty()); - BOOST_CHECK(rcb->push_back(1)); - BOOST_CHECK(rcb->size() == 1); +#if 1 + typedef AtomicQueue<3> Rcb; + + Rcb tmp; + BOOST_CHECK(tmp.like_empty()); + BOOST_CHECK(tmp.push_back(1)); + BOOST_CHECK(tmp.tail() == 1); + BOOST_CHECK(tmp.head() == 0); int64_t d; - BOOST_CHECK(rcb->pop_front(d)); - BOOST_CHECK(rcb->empty()); + BOOST_CHECK(tmp.pop_front(d)); + BOOST_CHECK(tmp.like_empty()); + BOOST_CHECK(tmp.head() == 1); + BOOST_CHECK(tmp.tail() == 1); - const uint64_t nmsg = 1000 * 1000 * 1; + ShmObject<Rcb> rcb(shm, "test_rcb"); +#else + typedef Circular<int64_t> Rcb; + ShmObject<Rcb> rcb(shm, "test_rcb", 64, shm.get_segment_manager()); +#endif + + const int nsize = sizeof(Rcb); + + bool try_more = false; uint64_t correct_total = nmsg * (nmsg - 1) / 2; std::atomic<uint64_t> total(0); std::atomic<uint64_t> nwrite(0); @@ -61,7 +63,7 @@ auto Writer = [&]() { uint64_t n = 0; while ((n = nwrite++) < nmsg) { - while (!rcb->push_back(n)) { + while (!rcb->push_back(n, try_more)) { // MySleep(); } ++writedone; @@ -71,11 +73,11 @@ auto Reader = [&]() { while (nread.load() < nmsg) { int64_t d; - if (rcb->pop_front(d)) { + if (rcb->pop_front(d, try_more)) { ++nread; total += d; } else { - MySleep(); + // MySleep(); } } }; @@ -89,21 +91,25 @@ next += 1s; auto w = writedone.load(); auto r = nread.load(); - printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld , queue size: %d\n", w, w - lw, r, r - lr, rcb->size()); + printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld\n", + w, w - lw, r, r - lr); lw = w; lr = r; } while (nread.load() < nmsg); }; - ThreadManager threads; - boost::timer::auto_cpu_timer timer; - printf("Testing Robust Buffer, msgs %ld, queue size: %d \n", nmsg, nelement); - threads.Launch(status); - for (int i = 0; i < 10; ++i) { - threads.Launch(Reader); - threads.Launch(Writer); + std::thread st(status); + { + ThreadManager threads; + boost::timer::auto_cpu_timer timer; + printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread); + for (int i = 0; i < nthread; ++i) { + threads.Launch(Reader); + threads.Launch(Writer); + } + threads.WaitAll(); } - threads.WaitAll(); + st.join(); printf("total: %ld, expected: %ld\n", total.load(), correct_total); BOOST_CHECK_EQUAL(total.load(), correct_total); } @@ -137,9 +143,7 @@ } { - boost::timer::auto_cpu_timer timer; const int ntimes = 1000 * 1000; - printf("test lock/unlock %d times: ", ntimes); RobustMutex mutex; auto Lock = [&]() { for (int i = 0; i < ntimes; ++i) { @@ -147,16 +151,20 @@ mutex.unlock(); } }; - std::thread t1(Lock), t2(Lock); - t1.join(); - t2.join(); - } - auto MSFromNow = [](const int ms) { - using namespace boost::posix_time; - ptime cur = boost::posix_time::microsec_clock::universal_time(); - return cur + millisec(ms); - }; + { + boost::timer::auto_cpu_timer timer; + printf("test lock/unlock %d times: ", ntimes); + Lock(); + } + { + boost::timer::auto_cpu_timer timer; + printf("test lock/unlock %d times, 2 thread: ", ntimes); + std::thread t1(Lock), t2(Lock); + t1.join(); + t2.join(); + } + } auto TryLock = [&]() { if (mtx->try_lock()) { -- Gitblit v1.8.0