From 1d6c040dcb9a01648edc66d8c0006c8c9294a705 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 22 四月 2021 18:28:30 +0800
Subject: [PATCH] add mutex timeout limit; use atomic as refcount.
---
/dev/null | 57 -----------
src/shm.h | 50 +++++++++
utest/api_test.cpp | 102 +++++++++++++++++++
src/bh_util.h | 6
src/msg.h | 27 +---
src/shm.cpp | 26 +++++
6 files changed, 184 insertions(+), 84 deletions(-)
diff --git a/src/bh_util.h b/src/bh_util.h
index bc48578..e3ab70b 100644
--- a/src/bh_util.h
+++ b/src/bh_util.h
@@ -123,13 +123,13 @@
D &operator*() const { return *p_; }
};
-template <class T, class Mutex = std::mutex, class Lock = std::unique_lock<Mutex>>
+template <class T, class TMutex = std::mutex, class Lock = std::unique_lock<TMutex>>
class Synced
{
typedef T Data;
- Mutex mutex_;
+ TMutex mutex_;
Data data_;
- typedef SyncedPtr<Data, Mutex, Lock> Ptr;
+ typedef SyncedPtr<Data, TMutex, Lock> Ptr;
public:
template <class... P>
diff --git a/src/msg.h b/src/msg.h
index e6b0b34..feab5ec 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -21,6 +21,7 @@
#include "bh_util.h"
#include "proto.h"
#include "shm.h"
+#include <atomic>
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <functional>
@@ -38,26 +39,14 @@
// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
class RefCount : private boost::noncopyable
{
-public:
- int Inc()
- {
- Guard lk(mutex_);
- return ++num_;
- }
- int Dec()
- {
- Guard lk(mutex_);
- return --num_;
- }
- int Get()
- {
- Guard lk(mutex_);
- return num_;
- }
+ std::atomic<int> num_;
-private:
- Mutex mutex_;
- int num_ = 1;
+public:
+ RefCount() :
+ num_(1) { static_assert(std::is_pod<decltype(num_)>::value); }
+ int Inc() { return ++num_; }
+ int Dec() { return --num_; }
+ int Get() { return num_.load(); }
};
// message content layout: header_size + header + data_size + data
diff --git a/src/shm.cpp b/src/shm.cpp
index 1658900..d499b16 100644
--- a/src/shm.cpp
+++ b/src/shm.cpp
@@ -21,6 +21,32 @@
namespace bhome_shm
{
+bool MutexWithTimeLimit::try_lock()
+{
+ if (mutex_.try_lock()) {
+ auto old_time = last_lock_time_.load();
+ if (Now() - old_time > limit_) {
+ return last_lock_time_.compare_exchange_strong(old_time, Now());
+ } else {
+ last_lock_time_.store(Now());
+ return true;
+ }
+ } else {
+ auto old_time = last_lock_time_.load();
+ if (Now() - old_time > limit_) {
+ return last_lock_time_.compare_exchange_strong(old_time, Now());
+ } else {
+ return false;
+ }
+ }
+}
+void MutexWithTimeLimit::lock()
+{
+ while (!try_lock()) {
+ std::this_thread::yield();
+ }
+}
+
SharedMemory::SharedMemory(const std::string &name, const uint64_t size) :
mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()),
name_(name)
diff --git a/src/shm.h b/src/shm.h
index 28745dc..5bf8e41 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -19,12 +19,15 @@
#ifndef SHM_6CHO6D6C
#define SHM_6CHO6D6C
+#include <atomic>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/noncopyable.hpp>
#include <boost/uuid/uuid.hpp>
+#include <chrono>
+#include <thread>
namespace bhome_shm
{
@@ -32,7 +35,52 @@
using namespace boost::interprocess;
typedef managed_shared_memory mshm_t;
-typedef interprocess_mutex Mutex;
+
+class CasMutex
+{
+ std::atomic<bool> flag_;
+ bool cas(bool expected, bool new_val) { return flag_.compare_exchange_strong(expected, new_val); }
+
+public:
+ CasMutex() :
+ flag_(false) {}
+ bool try_lock() { return cas(false, true); }
+ void lock()
+ {
+ while (!try_lock()) { std::this_thread::yield(); }
+ }
+ void unlock() { cas(true, false); }
+};
+
+class MutexWithTimeLimit
+{
+ typedef boost::interprocess::interprocess_mutex MutexT;
+ // typedef CasMutex MutexT;
+ typedef std::chrono::steady_clock Clock;
+ typedef Clock::duration Duration;
+ static Duration Now() { return Clock::now().time_since_epoch(); }
+
+ const Duration limit_;
+ std::atomic<Duration> last_lock_time_;
+ MutexT mutex_;
+
+public:
+ typedef MutexT::internal_mutex_type internal_mutex_type;
+ const internal_mutex_type &internal_mutex() const { return mutex_.internal_mutex(); }
+ internal_mutex_type &internal_mutex() { return mutex_.internal_mutex(); }
+
+ explicit MutexWithTimeLimit(Duration limit) :
+ limit_(limit) {}
+ MutexWithTimeLimit() :
+ MutexWithTimeLimit(std::chrono::seconds(1)) {}
+ ~MutexWithTimeLimit() { static_assert(std::is_pod<Duration>::value); }
+ bool try_lock();
+ void lock();
+ void unlock() { mutex_.unlock(); }
+};
+
+// typedef boost::interprocess::interprocess_mutex Mutex;
+typedef MutexWithTimeLimit Mutex;
typedef scoped_lock<Mutex> Guard;
typedef interprocess_condition Cond;
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 7981a2c..a91db43 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -96,6 +96,63 @@
// printf("client Recv reply : %s\n", reply.data().c_str());
}
+class TLMutex
+{
+ // typedef boost::interprocess::interprocess_mutex MutexT;
+ typedef CasMutex MutexT;
+ // typedef std::mutex MutexT;
+ typedef std::chrono::steady_clock Clock;
+ typedef Clock::duration Duration;
+ static Duration Now() { return Clock::now().time_since_epoch(); }
+
+ const Duration limit_;
+ std::atomic<Duration> last_lock_time_;
+ MutexT mutex_;
+
+public:
+ struct Status {
+ int64_t nlock_ = 0;
+ int64_t nupdate_time_fail = 0;
+ int64_t nfail = 0;
+ int64_t nexcept = 0;
+ };
+ Status st_;
+
+ explicit TLMutex(Duration limit) :
+ limit_(limit) {}
+ TLMutex() :
+ TLMutex(std::chrono::seconds(1)) {}
+ ~TLMutex() { static_assert(std::is_pod<Duration>::value); }
+ bool try_lock()
+ {
+ if (mutex_.try_lock()) {
+ auto old_time = last_lock_time_.load();
+ if (Now() - old_time > limit_) {
+ return last_lock_time_.compare_exchange_strong(old_time, Now());
+ } else {
+ last_lock_time_.store(Now());
+ return true;
+ }
+ } else {
+ auto old_time = last_lock_time_.load();
+ if (Now() - old_time > limit_) {
+ return last_lock_time_.compare_exchange_strong(old_time, Now());
+ } else {
+ return false;
+ }
+ }
+ }
+ void lock()
+ {
+ int n = 0;
+ while (!try_lock()) {
+ n++;
+ std::this_thread::yield();
+ }
+ st_.nlock_ += n;
+ }
+ void unlock() { mutex_.unlock(); }
+};
BOOST_AUTO_TEST_CASE(MutexTest)
{
const std::string shm_name("ShmMutex");
@@ -104,12 +161,42 @@
const std::string mtx_name("test_mutex");
const std::string int_name("test_int");
- auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())();
+ auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(3s);
+
auto pi = shm.find_or_construct<int>(int_name.c_str())(100);
+ typedef std::chrono::steady_clock Clock;
+ auto Now = []() { return Clock::now().time_since_epoch(); };
if (pi) {
auto old = *pi;
printf("int : %d, add1: %d\n", old, ++*pi);
}
+
+ {
+ boost::timer::auto_cpu_timer timer;
+ printf("test time: ");
+ TLMutex mutex;
+ // CasMutex mutex;
+ auto Lock = [&]() {
+ for (int i = 0; i < 1000 * 100; ++i) {
+ mutex.lock();
+ mutex.unlock();
+ }
+ };
+ std::thread t1(Lock), t2(Lock);
+ t1.join();
+ t2.join();
+ printf("mutex nlock: %ld, update time error: %ld, normal fail: %ld, error wait: %ld\n",
+ mutex.st_.nlock_,
+ mutex.st_.nupdate_time_fail,
+ mutex.st_.nfail,
+ mutex.st_.nexcept);
+ }
+
+ auto MSFromNow = [](const int ms) {
+ using namespace boost::posix_time;
+ ptime cur = boost::posix_time::microsec_clock::universal_time();
+ return cur + millisec(ms);
+ };
auto TryLock = [&]() {
if (mtx->try_lock()) {
@@ -128,10 +215,17 @@
if (mtx) {
printf("mtx exists\n");
if (TryLock()) {
- if (TryLock()) {
- Unlock();
- }
+ auto op = [&]() {
+ if (TryLock()) {
+ Unlock();
+ }
+ };
+ op();
+ std::thread t(op);
+ t.join();
// Unlock();
+ } else {
+ // mtx->unlock();
}
} else {
printf("mtx not exists\n");
diff --git a/utest/lock_free_queue.cpp b/utest/lock_free_queue.cpp
deleted file mode 100644
index a05a454..0000000
--- a/utest/lock_free_queue.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: lock_free_queue.cpp
- *
- * Description:
- *
- * Version: 1.0
- * Created: 2021骞�04鏈�21鏃� 13鏃�57鍒�02绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: Li Chao (), lichao@aiotlink.com
- * Organization:
- *
- * =====================================================================================
- */
-#include "lock_free_queue.h"
-#include "defs.h"
-#include "util.h"
-
-BOOST_AUTO_TEST_CASE(LockFreeTest)
-{
- LockFreeQueue q(BHomeShm());
- for (int i = 0; i < 15; ++i) {
- int r = q.Write(i);
- printf("write %d %s\n", i, (r ? "ok" : "failed"));
- }
-}
\ No newline at end of file
diff --git a/utest/lock_free_queue.h b/utest/lock_free_queue.h
deleted file mode 100644
index 968f796..0000000
--- a/utest/lock_free_queue.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: lock_free_queue.h
- *
- * Description:
- *
- * Version: 1.0
- * Created: 2021骞�04鏈�21鏃� 14鏃�03鍒�27绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: Li Chao (), lichao@aiotlink.com
- * Organization:
- *
- * =====================================================================================
- */
-
-#ifndef LOCK_FREE_QUEUE_KQWP70HT
-#define LOCK_FREE_QUEUE_KQWP70HT
-
-#include "shm.h"
-#include <boost/interprocess/offset_ptr.hpp>
-#include <boost/lockfree/queue.hpp>
-
-using namespace bhome_shm;
-
-typedef int64_t Data;
-const int kQLen = 10;
-class LockFreeQueue : private boost::lockfree::queue<Data,
- boost::lockfree::allocator<Allocator<Data>>,
- boost::lockfree::capacity<kQLen>>,
- private boost::noncopyable
-{
- typedef boost::lockfree::queue<Data,
- boost::lockfree::allocator<Allocator<Data>>,
- boost::lockfree::capacity<kQLen>>
- Queue;
-
-public:
- LockFreeQueue(SharedMemory &shm) :
- Queue(shm.get_segment_manager()) {}
- bool Read(Data &d) { return pop(d); }
- bool Write(Data const &d) { return push(d); }
- template <class Func>
- bool Write(Data const &d, Func onWrite)
- {
- if (Write(d)) {
- onWrite(d);
- return true;
- } else {
- return false;
- }
- }
-};
-
-#endif // end of include guard: LOCK_FREE_QUEUE_KQWP70HT
--
Gitblit v1.8.0