From 72bffb0807925a156b076b71f78c848a08d27b87 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 29 四月 2021 10:55:35 +0800
Subject: [PATCH] refactor mutex.
---
src/robust.h | 73 +++++++++-----
src/shm.h | 2
utest/api_test.cpp | 85 -----------------
utest/robust_test.cpp | 104 ++++++++++++++++++--
src/robust.cpp | 2
5 files changed, 140 insertions(+), 126 deletions(-)
diff --git a/src/robust.cpp b/src/robust.cpp
index 38d5d28..006ea5f 100644
--- a/src/robust.cpp
+++ b/src/robust.cpp
@@ -26,7 +26,7 @@
{
static_assert(sizeof(steady_clock::duration) == sizeof(int64_t));
static_assert(sizeof(RobustReqRep) == 24);
-static_assert(sizeof(CasMutex<false>) == 8);
+static_assert(sizeof(Mutex) == 8);
static_assert(sizeof(CircularBuffer<int>) == 48);
auto Now() { return steady_clock::now().time_since_epoch(); }
diff --git a/src/robust.h b/src/robust.h
index 19e9bda..983567f 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -32,6 +32,7 @@
using namespace std::chrono;
using namespace std::chrono_literals;
+constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
void QuickSleep();
@@ -102,20 +103,41 @@
char buf[4];
};
-template <bool isRobust = false>
-class CasMutex
+class PidLocker
{
- static pid_t pid()
+public:
+ typedef int locker_t;
+ static locker_t this_locker()
{
- static pid_t val = getpid();
+ static locker_t val = getpid();
return val;
}
- static bool Killed(pid_t pid)
+ static bool is_alive(locker_t locker) { return true; }
+};
+
+class RobustPidLocker
+{
+public:
+ typedef int locker_t;
+ static locker_t this_locker()
+ {
+ static locker_t val = getpid();
+ return val;
+ }
+ static bool is_alive(locker_t locker)
{
char buf[64] = {0};
- snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
- return access(buf, F_OK) != 0;
+ snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker);
+ return access(buf, F_OK) == 0;
}
+};
+
+template <class LockerT>
+class CasMutex
+{
+ typedef typename LockerT::locker_t locker_t;
+ static inline locker_t this_locker() { return LockerT::this_locker(); }
+ static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
public:
CasMutex() :
@@ -126,11 +148,11 @@
auto old = meta_.load();
int r = 0;
if (!Locked(old)) {
- r = MetaCas(old, Meta(1, pid()));
- } else if (isRobust && Killed(Pid(old))) {
- r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1;
+ r = MetaCas(old, Meta(1, this_locker()));
+ } else if (!is_alive(Locker(old))) {
+ r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
if (r) {
- printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r);
+ printf("captured pid %d -> %d, r = %d\n", Locker(old), this_locker(), r);
}
}
return r;
@@ -146,19 +168,21 @@
void unlock()
{
auto old = meta_.load();
- if (Locked(old) && Pid(old) == pid()) {
- MetaCas(old, Meta(0, pid()));
+ if (Locked(old) && Locker(old) == this_locker()) {
+ MetaCas(old, Meta(0, this_locker()));
}
}
private:
+ static_assert(sizeof(locker_t) < sizeof(uint64_t), "locker size must be smaller than 64 bit!");
std::atomic<uint64_t> meta_;
bool Locked(uint64_t meta) { return (meta >> 63) != 0; }
- pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); }
- uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; }
+ locker_t Locker(uint64_t meta) { return meta & MaskBits(sizeof(locker_t) * 8); }
+ uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; }
bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
- static_assert(sizeof(pid_t) < sizeof(uint64_t));
};
+
+typedef CasMutex<RobustPidLocker> Mutex;
template <class Lock>
class Guard
@@ -193,19 +217,17 @@
state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap))
{
if (!buf) {
- throw("error allocate buffer: out of mem!");
+ throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
}
}
- ~CircularBuffer()
- {
- al_.deallocate(buf, capacity_);
- }
+ ~CircularBuffer() { al_.deallocate(buf, capacity_); }
+
size_type size() const { return (capacity_ + tail() - head()) % capacity_; }
bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; }
bool empty() const { return head() == tail(); }
bool push_back(Data d)
{
- Guard<MutexT> guard(mutex_);
+ Guard<Mutex> guard(mutex_);
if (!full()) {
auto old = mtail();
buf[Pos(old)] = d;
@@ -216,7 +238,7 @@
}
bool pop_front(Data &d)
{
- Guard<MutexT> guard(mutex_);
+ Guard<Mutex> guard(mutex_);
if (!empty()) {
auto old = mhead();
d = buf[Pos(old)];
@@ -233,8 +255,7 @@
CircularBuffer(CircularBuffer &&);
CircularBuffer &operator=(const CircularBuffer &) = delete;
CircularBuffer &operator=(CircularBuffer &&) = delete;
- typedef CasMutex<true> MutexT;
- // static_assert(sizeof(MutexT) == 16);
+
meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
size_type head() const { return Pos(mhead()); }
size_type tail() const { return Pos(mtail()); }
@@ -244,7 +265,7 @@
enum { eStateReady = 0x19833891 };
std::atomic<uint32_t> state_;
const size_type capacity_;
- MutexT mutex_;
+ Mutex mutex_;
std::atomic<meta_type> mhead_;
std::atomic<meta_type> mtail_;
Alloc al_;
diff --git a/src/shm.h b/src/shm.h
index 17352fe..515d856 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -90,7 +90,7 @@
}
};
-typedef robust::CasMutex<true> Mutex;
+typedef robust::Mutex Mutex;
typedef robust::Guard<Mutex> Guard;
class SharedMemory : public mshm_t
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 6577b51..c6165e8 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -102,91 +102,6 @@
using namespace std::chrono;
// using namespace std::chrono_literals;
-BOOST_AUTO_TEST_CASE(MutexTest)
-{
- // typedef robust::CasMutex<true> RobustMutex;
- typedef MutexWithPidCheck RobustMutex;
-
- for (int i = 0; i < 20; ++i) {
- int size = i;
- int left = size & 7;
- int rsize = size + ((8 - left) & 7);
- printf("size: %3d, rsize: %3d\n", size, rsize);
- }
- SharedMemory &shm = TestShm();
- // shm.Remove();
- // return;
- GlobalInit(shm);
-
- const std::string mtx_name("test_mutex");
- const std::string int_name("test_int");
- auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
- auto pi = shm.FindOrCreate<int>(int_name, 100);
-
- std::mutex m;
- typedef std::chrono::steady_clock Clock;
- auto Now = []() { return Clock::now().time_since_epoch(); };
- if (pi) {
- auto old = *pi;
- printf("int : %d, add1: %d\n", old, ++*pi);
- }
-
- {
- boost::timer::auto_cpu_timer timer;
- const int ntimes = 1000 * 1000;
- printf("test lock/unlock %d times: ", ntimes);
- RobustMutex mutex;
- auto Lock = [&]() {
- for (int i = 0; i < ntimes; ++i) {
- mutex.lock();
- mutex.unlock();
- }
- };
- std::thread t1(Lock), t2(Lock);
- t1.join();
- t2.join();
- }
-
- auto MSFromNow = [](const int ms) {
- using namespace boost::posix_time;
- ptime cur = boost::posix_time::microsec_clock::universal_time();
- return cur + millisec(ms);
- };
-
- auto TryLock = [&]() {
- if (mtx->try_lock()) {
- printf("try_lock ok\n");
- return true;
- } else {
- printf("try_lock failed\n");
- return false;
- }
- };
- auto Unlock = [&]() {
- mtx->unlock();
- printf("unlocked\n");
- };
-
- if (mtx) {
- printf("mtx exists\n");
- if (TryLock()) {
- auto op = [&]() {
- if (TryLock()) {
- Unlock();
- }
- };
- op();
- std::thread t(op);
- t.join();
- // Unlock();
- } else {
- // mtx->unlock();
- }
- } else {
- printf("mtx not exists\n");
- }
-}
-
BOOST_AUTO_TEST_CASE(ApiTest)
{
auto max_time = std::chrono::steady_clock::time_point::max();
diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp
index 0d54b46..9384c10 100644
--- a/utest/robust_test.cpp
+++ b/utest/robust_test.cpp
@@ -3,8 +3,13 @@
using namespace robust;
+enum {
+ eLockerBits = 32,
+ eLockerMask = MaskBits(sizeof(int) * 8),
+};
+
typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb;
-Rcb *GetRCBImpl(SharedMemory &shm, const int nelem)
+Rcb *GetRCB(SharedMemory &shm, const int nelem)
{
int cap = nelem + 1;
typedef uint64_t Data;
@@ -15,24 +20,13 @@
}
return nullptr;
}
-Rcb *GetRCB(SharedMemory &shm, const int nelem)
-{
- void **pStore = shm.FindOrCreate<void *>("test_rcb_pointer", nullptr);
- if (pStore) {
- if (!*pStore) {
- *pStore = GetRCBImpl(shm, nelem);
- }
- return (Rcb *) *pStore;
- }
- return nullptr;
-}
void MySleep()
{
std::this_thread::sleep_for(2us);
}
-BOOST_AUTO_TEST_CASE(RobustTest)
+BOOST_AUTO_TEST_CASE(QueueTest)
{
SharedMemory &shm = TestShm();
shm.Remove();
@@ -112,4 +106,88 @@
threads.WaitAll();
printf("total: %ld, expected: %ld\n", total.load(), correct_total);
BOOST_CHECK_EQUAL(total.load(), correct_total);
+}
+
+BOOST_AUTO_TEST_CASE(MutexTest)
+{
+ typedef robust::Mutex RobustMutex;
+
+ for (int i = 0; i < 20; ++i) {
+ int size = i;
+ int left = size & 7;
+ int rsize = size + ((8 - left) & 7);
+ printf("size: %3d, rsize: %3d\n", size, rsize);
+ }
+ SharedMemory &shm = TestShm();
+ // shm.Remove();
+ // return;
+ GlobalInit(shm);
+
+ const std::string mtx_name("test_mutex");
+ const std::string int_name("test_int");
+ auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
+ auto pi = shm.FindOrCreate<int>(int_name, 100);
+
+ std::mutex m;
+ typedef std::chrono::steady_clock Clock;
+ auto Now = []() { return Clock::now().time_since_epoch(); };
+ if (pi) {
+ auto old = *pi;
+ printf("int : %d, add1: %d\n", old, ++*pi);
+ }
+
+ {
+ boost::timer::auto_cpu_timer timer;
+ const int ntimes = 1000 * 1000;
+ printf("test lock/unlock %d times: ", ntimes);
+ RobustMutex mutex;
+ auto Lock = [&]() {
+ for (int i = 0; i < ntimes; ++i) {
+ mutex.lock();
+ mutex.unlock();
+ }
+ };
+ std::thread t1(Lock), t2(Lock);
+ t1.join();
+ t2.join();
+ }
+
+ auto MSFromNow = [](const int ms) {
+ using namespace boost::posix_time;
+ ptime cur = boost::posix_time::microsec_clock::universal_time();
+ return cur + millisec(ms);
+ };
+
+ auto TryLock = [&]() {
+ if (mtx->try_lock()) {
+ printf("try_lock ok\n");
+ return true;
+ } else {
+ printf("try_lock failed\n");
+ return false;
+ }
+ };
+ auto Unlock = [&]() {
+ mtx->unlock();
+ printf("unlocked\n");
+ };
+
+ if (mtx) {
+ printf("mtx exists\n");
+ if (TryLock()) {
+ auto op = [&]() {
+ if (TryLock()) {
+ Unlock();
+ }
+ };
+ op();
+ std::thread t(op);
+ t.join();
+ // Unlock();
+ } else {
+ // mtx->unlock();
+ }
+ } else {
+ printf("mtx not exists\n");
+ }
}
\ No newline at end of file
--
Gitblit v1.8.0