From d33a69463f1a75134d01191be0b9e1bdd757dd4b Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 30 四月 2021 15:27:59 +0800
Subject: [PATCH] add atomic queue, no lock, unorder.
---
src/robust.h | 120 +++++++++++++++++++----
utest/robust_test.cpp | 122 +++++++++++++-----------
2 files changed, 163 insertions(+), 79 deletions(-)
diff --git a/src/robust.h b/src/robust.h
index 983567f..b7459ad 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -107,6 +107,7 @@
{
public:
typedef int locker_t;
+ enum { eLockerBits = sizeof(locker_t) * 8 };
static locker_t this_locker()
{
static locker_t val = getpid();
@@ -119,6 +120,7 @@
{
public:
typedef int locker_t;
+ enum { eLockerBits = sizeof(locker_t) * 8 };
static locker_t this_locker()
{
static locker_t val = getpid();
@@ -132,12 +134,29 @@
}
};
+class ExpiredLocker
+{
+public:
+ typedef int64_t locker_t;
+ enum { eLockerBits = 63 };
+ static locker_t this_locker() { return Now(); }
+ static bool is_alive(locker_t locker)
+ {
+ return Now() < locker + steady_clock::duration(10s).count();
+ }
+
+private:
+ static locker_t Now() { return steady_clock::now().time_since_epoch().count(); }
+};
+
template <class LockerT>
class CasMutex
{
typedef typename LockerT::locker_t locker_t;
static inline locker_t this_locker() { return LockerT::this_locker(); }
static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
+ static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits);
+ static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!");
public:
CasMutex() :
@@ -152,7 +171,7 @@
} else if (!is_alive(Locker(old))) {
r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
if (r) {
- printf("captured pid %d -> %d, r = %d\n", Locker(old), this_locker(), r);
+ printf("captured locker %ld -> %ld, locker = %d\n", int64_t(Locker(old)), int64_t(this_locker()), r);
}
}
return r;
@@ -174,10 +193,9 @@
}
private:
- static_assert(sizeof(locker_t) < sizeof(uint64_t), "locker size must be smaller than 64 bit!");
std::atomic<uint64_t> meta_;
- bool Locked(uint64_t meta) { return (meta >> 63) != 0; }
- locker_t Locker(uint64_t meta) { return meta & MaskBits(sizeof(locker_t) * 8); }
+ bool Locked(uint64_t meta) { return (meta >> 63) == 1; }
+ locker_t Locker(uint64_t meta) { return meta & kLockerMask; }
uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; }
bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
};
@@ -206,7 +224,7 @@
typedef uint64_t meta_type;
static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
static count_type Count(meta_type meta) { return meta >> 32; }
- static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
+ static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
public:
typedef D Data;
@@ -214,41 +232,40 @@
CircularBuffer(const size_type cap) :
CircularBuffer(cap, Alloc()) {}
CircularBuffer(const size_type cap, Alloc const &al) :
- state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap))
+ capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_))
{
if (!buf) {
throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
+ } else {
+ memset(&buf[0], 0, sizeof(D) * capacity_);
}
}
~CircularBuffer() { al_.deallocate(buf, capacity_); }
- size_type size() const { return (capacity_ + tail() - head()) % capacity_; }
- bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; }
- bool empty() const { return head() == tail(); }
- bool push_back(Data d)
+ bool push_back(const Data d)
{
Guard<Mutex> guard(mutex_);
- if (!full()) {
- auto old = mtail();
- buf[Pos(old)] = d;
+ auto old = mtail();
+ auto pos = Pos(old);
+ auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
+ if (!full) {
+ buf[pos] = d;
return mtail_.compare_exchange_strong(old, next(old));
- } else {
- return false;
}
+ return false;
}
bool pop_front(Data &d)
{
Guard<Mutex> guard(mutex_);
- if (!empty()) {
- auto old = mhead();
- d = buf[Pos(old)];
+ auto old = mhead();
+ auto pos = Pos(old);
+ if (!(pos == tail())) {
+ d = buf[pos];
return mhead_.compare_exchange_strong(old, next(old));
} else {
return false;
}
}
- bool Ready() const { return state_.load() == eStateReady; }
- void PutReady() { state_.store(eStateReady); }
private:
CircularBuffer(const CircularBuffer &);
@@ -262,8 +279,6 @@
meta_type mhead() const { return mhead_.load(); }
meta_type mtail() const { return mtail_.load(); }
// data
- enum { eStateReady = 0x19833891 };
- std::atomic<uint32_t> state_;
const size_type capacity_;
Mutex mutex_;
std::atomic<meta_type> mhead_;
@@ -272,5 +287,66 @@
typename Alloc::pointer buf = nullptr;
};
+template <unsigned PowerSize = 4, class Int = int64_t>
+class AtomicQueue
+{
+public:
+ typedef uint32_t size_type;
+ typedef Int Data;
+ typedef std::atomic<Data> AData;
+ static_assert(sizeof(Data) == sizeof(AData));
+ enum {
+ power = PowerSize,
+ capacity = (1 << power),
+ mask = capacity - 1,
+ };
+
+ AtomicQueue() { memset(this, 0, sizeof(*this)); }
+ size_type head() const { return head_.load(); }
+ size_type tail() const { return tail_.load(); }
+ bool like_empty() const { return head() == tail() && Empty(buf[head()]); }
+ bool like_full() const { return head() == tail() && !Empty(buf[head()]); }
+ bool push_back(const Data d, bool try_more = false)
+ {
+ bool r = false;
+ size_type i = 0;
+ do {
+ auto pos = tail();
+ auto cur = buf[pos].load();
+ r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d));
+ tail_.compare_exchange_strong(pos, Next(pos));
+ } while (try_more && !r && ++i < capacity);
+ return r;
+ }
+ bool pop_front(Data &d, bool try_more = false)
+ {
+ bool r = false;
+ Data cur;
+ size_type i = 0;
+ do {
+ auto pos = head();
+ cur = buf[pos].load();
+ r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0);
+ head_.compare_exchange_strong(pos, Next(pos));
+ } while (try_more && !r && ++i < capacity);
+ if (r) { d = Dec(cur); }
+ return r;
+ }
+
+private:
+ static_assert(std::is_integral<Data>::value, "Data must be integral type!");
+ static_assert(std::is_signed<Data>::value, "Data must be signed type!");
+ static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!");
+
+ static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
+ static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok.
+ static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok.
+ static size_type Next(const size_type index) { return (index + 1) & mask; }
+
+ std::atomic<size_type> head_;
+ std::atomic<size_type> tail_;
+ AData buf[capacity];
+};
+
} // namespace robust
#endif // end of include guard: ROBUST_Q31RCWYU
diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp
index 9384c10..7799aad 100644
--- a/utest/robust_test.cpp
+++ b/utest/robust_test.cpp
@@ -8,52 +8,54 @@
eLockerMask = MaskBits(sizeof(int) * 8),
};
-typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb;
-Rcb *GetRCB(SharedMemory &shm, const int nelem)
-{
- int cap = nelem + 1;
- typedef uint64_t Data;
- auto size = sizeof(Rcb) + sizeof(Data) * cap;
- void *p = shm.Alloc(size);
- if (p) {
- return new (p) Rcb(cap, shm.get_segment_manager());
- }
- return nullptr;
-}
-
void MySleep()
{
std::this_thread::sleep_for(2us);
}
+/////////////////////////////////////////////////////////////////////////////////////////
+
BOOST_AUTO_TEST_CASE(QueueTest)
{
+ const int nthread = 100;
+ const uint64_t nmsg = 1000 * 1000 * 10;
+
SharedMemory &shm = TestShm();
shm.Remove();
- pid_t pid = getpid();
- printf("pid : %d\n", pid);
- auto Access = [](pid_t pid) {
- char buf[100] = {0};
- sprintf(buf, "/proc/%d/stat", pid);
- int r = access(buf, F_OK);
- printf("access %d\n", r);
- };
- Access(pid);
- Access(pid + 1);
- // Sleep(10s);
- // return;
+ // return; /////////////////////////////////////////////////
+ int64_t i64 = 0;
+ char c = 0;
+ for (int i = 0; i < 256; ++i) {
+ c = i;
+ i64 = int64_t(c) << 1;
+ BOOST_CHECK_EQUAL(c, (i64 >> 1));
+ uint64_t u64 = i;
+ BOOST_CHECK_EQUAL((u64 & 255), i);
+ }
- int nelement = 640;
- auto rcb = GetRCB(shm, nelement);
- BOOST_CHECK(rcb != nullptr);
- BOOST_CHECK(rcb->empty());
- BOOST_CHECK(rcb->push_back(1));
- BOOST_CHECK(rcb->size() == 1);
+#if 1
+ typedef AtomicQueue<3> Rcb;
+
+ Rcb tmp;
+ BOOST_CHECK(tmp.like_empty());
+ BOOST_CHECK(tmp.push_back(1));
+ BOOST_CHECK(tmp.tail() == 1);
+ BOOST_CHECK(tmp.head() == 0);
int64_t d;
- BOOST_CHECK(rcb->pop_front(d));
- BOOST_CHECK(rcb->empty());
+ BOOST_CHECK(tmp.pop_front(d));
+ BOOST_CHECK(tmp.like_empty());
+ BOOST_CHECK(tmp.head() == 1);
+ BOOST_CHECK(tmp.tail() == 1);
- const uint64_t nmsg = 1000 * 1000 * 1;
+ ShmObject<Rcb> rcb(shm, "test_rcb");
+#else
+ typedef Circular<int64_t> Rcb;
+ ShmObject<Rcb> rcb(shm, "test_rcb", 64, shm.get_segment_manager());
+#endif
+
+ const int nsize = sizeof(Rcb);
+
+ bool try_more = false;
uint64_t correct_total = nmsg * (nmsg - 1) / 2;
std::atomic<uint64_t> total(0);
std::atomic<uint64_t> nwrite(0);
@@ -61,7 +63,7 @@
auto Writer = [&]() {
uint64_t n = 0;
while ((n = nwrite++) < nmsg) {
- while (!rcb->push_back(n)) {
+ while (!rcb->push_back(n, try_more)) {
// MySleep();
}
++writedone;
@@ -71,11 +73,11 @@
auto Reader = [&]() {
while (nread.load() < nmsg) {
int64_t d;
- if (rcb->pop_front(d)) {
+ if (rcb->pop_front(d, try_more)) {
++nread;
total += d;
} else {
- MySleep();
+ // MySleep();
}
}
};
@@ -89,21 +91,25 @@
next += 1s;
auto w = writedone.load();
auto r = nread.load();
- printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld , queue size: %d\n", w, w - lw, r, r - lr, rcb->size());
+ printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld\n",
+ w, w - lw, r, r - lr);
lw = w;
lr = r;
} while (nread.load() < nmsg);
};
- ThreadManager threads;
- boost::timer::auto_cpu_timer timer;
- printf("Testing Robust Buffer, msgs %ld, queue size: %d \n", nmsg, nelement);
- threads.Launch(status);
- for (int i = 0; i < 10; ++i) {
- threads.Launch(Reader);
- threads.Launch(Writer);
+ std::thread st(status);
+ {
+ ThreadManager threads;
+ boost::timer::auto_cpu_timer timer;
+ printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread);
+ for (int i = 0; i < nthread; ++i) {
+ threads.Launch(Reader);
+ threads.Launch(Writer);
+ }
+ threads.WaitAll();
}
- threads.WaitAll();
+ st.join();
printf("total: %ld, expected: %ld\n", total.load(), correct_total);
BOOST_CHECK_EQUAL(total.load(), correct_total);
}
@@ -137,9 +143,7 @@
}
{
- boost::timer::auto_cpu_timer timer;
const int ntimes = 1000 * 1000;
- printf("test lock/unlock %d times: ", ntimes);
RobustMutex mutex;
auto Lock = [&]() {
for (int i = 0; i < ntimes; ++i) {
@@ -147,16 +151,20 @@
mutex.unlock();
}
};
- std::thread t1(Lock), t2(Lock);
- t1.join();
- t2.join();
- }
- auto MSFromNow = [](const int ms) {
- using namespace boost::posix_time;
- ptime cur = boost::posix_time::microsec_clock::universal_time();
- return cur + millisec(ms);
- };
+ {
+ boost::timer::auto_cpu_timer timer;
+ printf("test lock/unlock %d times: ", ntimes);
+ Lock();
+ }
+ {
+ boost::timer::auto_cpu_timer timer;
+ printf("test lock/unlock %d times, 2 thread: ", ntimes);
+ std::thread t1(Lock), t2(Lock);
+ t1.join();
+ t2.join();
+ }
+ }
auto TryLock = [&]() {
if (mtx->try_lock()) {
--
Gitblit v1.8.0