From 1d6c040dcb9a01648edc66d8c0006c8c9294a705 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 22 四月 2021 18:28:30 +0800 Subject: [PATCH] add mutex timeout limit; use atomic as refcount. --- /dev/null | 57 ----------- src/shm.h | 50 +++++++++ utest/api_test.cpp | 102 +++++++++++++++++++ src/bh_util.h | 6 src/msg.h | 27 +--- src/shm.cpp | 26 +++++ 6 files changed, 184 insertions(+), 84 deletions(-) diff --git a/src/bh_util.h b/src/bh_util.h index bc48578..e3ab70b 100644 --- a/src/bh_util.h +++ b/src/bh_util.h @@ -123,13 +123,13 @@ D &operator*() const { return *p_; } }; -template <class T, class Mutex = std::mutex, class Lock = std::unique_lock<Mutex>> +template <class T, class TMutex = std::mutex, class Lock = std::unique_lock<TMutex>> class Synced { typedef T Data; - Mutex mutex_; + TMutex mutex_; Data data_; - typedef SyncedPtr<Data, Mutex, Lock> Ptr; + typedef SyncedPtr<Data, TMutex, Lock> Ptr; public: template <class... P> diff --git a/src/msg.h b/src/msg.h index e6b0b34..feab5ec 100644 --- a/src/msg.h +++ b/src/msg.h @@ -21,6 +21,7 @@ #include "bh_util.h" #include "proto.h" #include "shm.h" +#include <atomic> #include <boost/interprocess/offset_ptr.hpp> #include <boost/uuid/uuid_generators.hpp> #include <functional> @@ -38,26 +39,14 @@ // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. class RefCount : private boost::noncopyable { -public: - int Inc() - { - Guard lk(mutex_); - return ++num_; - } - int Dec() - { - Guard lk(mutex_); - return --num_; - } - int Get() - { - Guard lk(mutex_); - return num_; - } + std::atomic<int> num_; -private: - Mutex mutex_; - int num_ = 1; +public: + RefCount() : + num_(1) { static_assert(std::is_pod<decltype(num_)>::value); } + int Inc() { return ++num_; } + int Dec() { return --num_; } + int Get() { return num_.load(); } }; // message content layout: header_size + header + data_size + data diff --git a/src/shm.cpp b/src/shm.cpp index 1658900..d499b16 100644 --- a/src/shm.cpp +++ b/src/shm.cpp @@ -21,6 +21,32 @@ namespace bhome_shm { +bool MutexWithTimeLimit::try_lock() +{ + if (mutex_.try_lock()) { + auto old_time = last_lock_time_.load(); + if (Now() - old_time > limit_) { + return last_lock_time_.compare_exchange_strong(old_time, Now()); + } else { + last_lock_time_.store(Now()); + return true; + } + } else { + auto old_time = last_lock_time_.load(); + if (Now() - old_time > limit_) { + return last_lock_time_.compare_exchange_strong(old_time, Now()); + } else { + return false; + } + } +} +void MutexWithTimeLimit::lock() +{ + while (!try_lock()) { + std::this_thread::yield(); + } +} + SharedMemory::SharedMemory(const std::string &name, const uint64_t size) : mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()), name_(name) diff --git a/src/shm.h b/src/shm.h index 28745dc..5bf8e41 100644 --- a/src/shm.h +++ b/src/shm.h @@ -19,12 +19,15 @@ #ifndef SHM_6CHO6D6C #define SHM_6CHO6D6C +#include <atomic> #include <boost/interprocess/managed_shared_memory.hpp> #include <boost/interprocess/sync/interprocess_condition.hpp> #include <boost/interprocess/sync/interprocess_mutex.hpp> #include <boost/interprocess/sync/scoped_lock.hpp> #include <boost/noncopyable.hpp> #include <boost/uuid/uuid.hpp> +#include <chrono> +#include <thread> namespace bhome_shm { @@ -32,7 +35,52 @@ using namespace boost::interprocess; typedef managed_shared_memory mshm_t; -typedef interprocess_mutex Mutex; + +class CasMutex +{ + std::atomic<bool> flag_; + bool cas(bool expected, bool new_val) { return flag_.compare_exchange_strong(expected, new_val); } + +public: + CasMutex() : + flag_(false) {} + bool try_lock() { return cas(false, true); } + void lock() + { + while (!try_lock()) { std::this_thread::yield(); } + } + void unlock() { cas(true, false); } +}; + +class MutexWithTimeLimit +{ + typedef boost::interprocess::interprocess_mutex MutexT; + // typedef CasMutex MutexT; + typedef std::chrono::steady_clock Clock; + typedef Clock::duration Duration; + static Duration Now() { return Clock::now().time_since_epoch(); } + + const Duration limit_; + std::atomic<Duration> last_lock_time_; + MutexT mutex_; + +public: + typedef MutexT::internal_mutex_type internal_mutex_type; + const internal_mutex_type &internal_mutex() const { return mutex_.internal_mutex(); } + internal_mutex_type &internal_mutex() { return mutex_.internal_mutex(); } + + explicit MutexWithTimeLimit(Duration limit) : + limit_(limit) {} + MutexWithTimeLimit() : + MutexWithTimeLimit(std::chrono::seconds(1)) {} + ~MutexWithTimeLimit() { static_assert(std::is_pod<Duration>::value); } + bool try_lock(); + void lock(); + void unlock() { mutex_.unlock(); } +}; + +// typedef boost::interprocess::interprocess_mutex Mutex; +typedef MutexWithTimeLimit Mutex; typedef scoped_lock<Mutex> Guard; typedef interprocess_condition Cond; diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 7981a2c..a91db43 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -96,6 +96,63 @@ // printf("client Recv reply : %s\n", reply.data().c_str()); } +class TLMutex +{ + // typedef boost::interprocess::interprocess_mutex MutexT; + typedef CasMutex MutexT; + // typedef std::mutex MutexT; + typedef std::chrono::steady_clock Clock; + typedef Clock::duration Duration; + static Duration Now() { return Clock::now().time_since_epoch(); } + + const Duration limit_; + std::atomic<Duration> last_lock_time_; + MutexT mutex_; + +public: + struct Status { + int64_t nlock_ = 0; + int64_t nupdate_time_fail = 0; + int64_t nfail = 0; + int64_t nexcept = 0; + }; + Status st_; + + explicit TLMutex(Duration limit) : + limit_(limit) {} + TLMutex() : + TLMutex(std::chrono::seconds(1)) {} + ~TLMutex() { static_assert(std::is_pod<Duration>::value); } + bool try_lock() + { + if (mutex_.try_lock()) { + auto old_time = last_lock_time_.load(); + if (Now() - old_time > limit_) { + return last_lock_time_.compare_exchange_strong(old_time, Now()); + } else { + last_lock_time_.store(Now()); + return true; + } + } else { + auto old_time = last_lock_time_.load(); + if (Now() - old_time > limit_) { + return last_lock_time_.compare_exchange_strong(old_time, Now()); + } else { + return false; + } + } + } + void lock() + { + int n = 0; + while (!try_lock()) { + n++; + std::this_thread::yield(); + } + st_.nlock_ += n; + } + void unlock() { mutex_.unlock(); } +}; BOOST_AUTO_TEST_CASE(MutexTest) { const std::string shm_name("ShmMutex"); @@ -104,12 +161,42 @@ const std::string mtx_name("test_mutex"); const std::string int_name("test_int"); - auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(); + auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(3s); + auto pi = shm.find_or_construct<int>(int_name.c_str())(100); + typedef std::chrono::steady_clock Clock; + auto Now = []() { return Clock::now().time_since_epoch(); }; if (pi) { auto old = *pi; printf("int : %d, add1: %d\n", old, ++*pi); } + + { + boost::timer::auto_cpu_timer timer; + printf("test time: "); + TLMutex mutex; + // CasMutex mutex; + auto Lock = [&]() { + for (int i = 0; i < 1000 * 100; ++i) { + mutex.lock(); + mutex.unlock(); + } + }; + std::thread t1(Lock), t2(Lock); + t1.join(); + t2.join(); + printf("mutex nlock: %ld, update time error: %ld, normal fail: %ld, error wait: %ld\n", + mutex.st_.nlock_, + mutex.st_.nupdate_time_fail, + mutex.st_.nfail, + mutex.st_.nexcept); + } + + auto MSFromNow = [](const int ms) { + using namespace boost::posix_time; + ptime cur = boost::posix_time::microsec_clock::universal_time(); + return cur + millisec(ms); + }; auto TryLock = [&]() { if (mtx->try_lock()) { @@ -128,10 +215,17 @@ if (mtx) { printf("mtx exists\n"); if (TryLock()) { - if (TryLock()) { - Unlock(); - } + auto op = [&]() { + if (TryLock()) { + Unlock(); + } + }; + op(); + std::thread t(op); + t.join(); // Unlock(); + } else { + // mtx->unlock(); } } else { printf("mtx not exists\n"); diff --git a/utest/lock_free_queue.cpp b/utest/lock_free_queue.cpp deleted file mode 100644 index a05a454..0000000 --- a/utest/lock_free_queue.cpp +++ /dev/null @@ -1,29 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: lock_free_queue.cpp - * - * Description: - * - * Version: 1.0 - * Created: 2021骞�04鏈�21鏃� 13鏃�57鍒�02绉� - * Revision: none - * Compiler: gcc - * - * Author: Li Chao (), lichao@aiotlink.com - * Organization: - * - * ===================================================================================== - */ -#include "lock_free_queue.h" -#include "defs.h" -#include "util.h" - -BOOST_AUTO_TEST_CASE(LockFreeTest) -{ - LockFreeQueue q(BHomeShm()); - for (int i = 0; i < 15; ++i) { - int r = q.Write(i); - printf("write %d %s\n", i, (r ? "ok" : "failed")); - } -} \ No newline at end of file diff --git a/utest/lock_free_queue.h b/utest/lock_free_queue.h deleted file mode 100644 index 968f796..0000000 --- a/utest/lock_free_queue.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: lock_free_queue.h - * - * Description: - * - * Version: 1.0 - * Created: 2021骞�04鏈�21鏃� 14鏃�03鍒�27绉� - * Revision: none - * Compiler: gcc - * - * Author: Li Chao (), lichao@aiotlink.com - * Organization: - * - * ===================================================================================== - */ - -#ifndef LOCK_FREE_QUEUE_KQWP70HT -#define LOCK_FREE_QUEUE_KQWP70HT - -#include "shm.h" -#include <boost/interprocess/offset_ptr.hpp> -#include <boost/lockfree/queue.hpp> - -using namespace bhome_shm; - -typedef int64_t Data; -const int kQLen = 10; -class LockFreeQueue : private boost::lockfree::queue<Data, - boost::lockfree::allocator<Allocator<Data>>, - boost::lockfree::capacity<kQLen>>, - private boost::noncopyable -{ - typedef boost::lockfree::queue<Data, - boost::lockfree::allocator<Allocator<Data>>, - boost::lockfree::capacity<kQLen>> - Queue; - -public: - LockFreeQueue(SharedMemory &shm) : - Queue(shm.get_segment_manager()) {} - bool Read(Data &d) { return pop(d); } - bool Write(Data const &d) { return push(d); } - template <class Func> - bool Write(Data const &d, Func onWrite) - { - if (Write(d)) { - onWrite(d); - return true; - } else { - return false; - } - } -}; - -#endif // end of include guard: LOCK_FREE_QUEUE_KQWP70HT -- Gitblit v1.8.0