src/bh_util.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/msg.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/shm.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/shm.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
utest/api_test.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
utest/lock_free_queue.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
utest/lock_free_queue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/bh_util.h
@@ -123,13 +123,13 @@ D &operator*() const { return *p_; } }; template <class T, class Mutex = std::mutex, class Lock = std::unique_lock<Mutex>> template <class T, class TMutex = std::mutex, class Lock = std::unique_lock<TMutex>> class Synced { typedef T Data; Mutex mutex_; TMutex mutex_; Data data_; typedef SyncedPtr<Data, Mutex, Lock> Ptr; typedef SyncedPtr<Data, TMutex, Lock> Ptr; public: template <class... P> src/msg.h
@@ -21,6 +21,7 @@ #include "bh_util.h" #include "proto.h" #include "shm.h" #include <atomic> #include <boost/interprocess/offset_ptr.hpp> #include <boost/uuid/uuid_generators.hpp> #include <functional> @@ -38,26 +39,14 @@ // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. class RefCount : private boost::noncopyable { public: int Inc() { Guard lk(mutex_); return ++num_; } int Dec() { Guard lk(mutex_); return --num_; } int Get() { Guard lk(mutex_); return num_; } std::atomic<int> num_; private: Mutex mutex_; int num_ = 1; public: RefCount() : num_(1) { static_assert(std::is_pod<decltype(num_)>::value); } int Inc() { return ++num_; } int Dec() { return --num_; } int Get() { return num_.load(); } }; // message content layout: header_size + header + data_size + data src/shm.cpp
@@ -21,6 +21,32 @@ namespace bhome_shm { bool MutexWithTimeLimit::try_lock() { if (mutex_.try_lock()) { auto old_time = last_lock_time_.load(); if (Now() - old_time > limit_) { return last_lock_time_.compare_exchange_strong(old_time, Now()); } else { last_lock_time_.store(Now()); return true; } } else { auto old_time = last_lock_time_.load(); if (Now() - old_time > limit_) { return last_lock_time_.compare_exchange_strong(old_time, Now()); } else { return false; } } } void MutexWithTimeLimit::lock() { while (!try_lock()) { std::this_thread::yield(); } } SharedMemory::SharedMemory(const std::string &name, const uint64_t size) : mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()), name_(name) src/shm.h
@@ -19,12 +19,15 @@ #ifndef SHM_6CHO6D6C #define SHM_6CHO6D6C #include <atomic> #include <boost/interprocess/managed_shared_memory.hpp> #include <boost/interprocess/sync/interprocess_condition.hpp> #include <boost/interprocess/sync/interprocess_mutex.hpp> #include <boost/interprocess/sync/scoped_lock.hpp> #include <boost/noncopyable.hpp> #include <boost/uuid/uuid.hpp> #include <chrono> #include <thread> namespace bhome_shm { @@ -32,7 +35,52 @@ using namespace boost::interprocess; typedef managed_shared_memory mshm_t; typedef interprocess_mutex Mutex; class CasMutex { std::atomic<bool> flag_; bool cas(bool expected, bool new_val) { return flag_.compare_exchange_strong(expected, new_val); } public: CasMutex() : flag_(false) {} bool try_lock() { return cas(false, true); } void lock() { while (!try_lock()) { std::this_thread::yield(); } } void unlock() { cas(true, false); } }; class MutexWithTimeLimit { typedef boost::interprocess::interprocess_mutex MutexT; // typedef CasMutex MutexT; typedef std::chrono::steady_clock Clock; typedef Clock::duration Duration; static Duration Now() { return Clock::now().time_since_epoch(); } const Duration limit_; std::atomic<Duration> last_lock_time_; MutexT mutex_; public: typedef MutexT::internal_mutex_type internal_mutex_type; const internal_mutex_type &internal_mutex() const { return mutex_.internal_mutex(); } internal_mutex_type &internal_mutex() { return mutex_.internal_mutex(); } explicit MutexWithTimeLimit(Duration limit) : limit_(limit) {} MutexWithTimeLimit() : MutexWithTimeLimit(std::chrono::seconds(1)) {} ~MutexWithTimeLimit() { static_assert(std::is_pod<Duration>::value); } bool try_lock(); void lock(); void unlock() { mutex_.unlock(); } }; // typedef boost::interprocess::interprocess_mutex Mutex; typedef MutexWithTimeLimit Mutex; typedef scoped_lock<Mutex> Guard; typedef interprocess_condition Cond; utest/api_test.cpp
@@ -96,6 +96,63 @@ // printf("client Recv reply : %s\n", reply.data().c_str()); } class TLMutex { // typedef boost::interprocess::interprocess_mutex MutexT; typedef CasMutex MutexT; // typedef std::mutex MutexT; typedef std::chrono::steady_clock Clock; typedef Clock::duration Duration; static Duration Now() { return Clock::now().time_since_epoch(); } const Duration limit_; std::atomic<Duration> last_lock_time_; MutexT mutex_; public: struct Status { int64_t nlock_ = 0; int64_t nupdate_time_fail = 0; int64_t nfail = 0; int64_t nexcept = 0; }; Status st_; explicit TLMutex(Duration limit) : limit_(limit) {} TLMutex() : TLMutex(std::chrono::seconds(1)) {} ~TLMutex() { static_assert(std::is_pod<Duration>::value); } bool try_lock() { if (mutex_.try_lock()) { auto old_time = last_lock_time_.load(); if (Now() - old_time > limit_) { return last_lock_time_.compare_exchange_strong(old_time, Now()); } else { last_lock_time_.store(Now()); return true; } } else { auto old_time = last_lock_time_.load(); if (Now() - old_time > limit_) { return last_lock_time_.compare_exchange_strong(old_time, Now()); } else { return false; } } } void lock() { int n = 0; while (!try_lock()) { n++; std::this_thread::yield(); } st_.nlock_ += n; } void unlock() { mutex_.unlock(); } }; BOOST_AUTO_TEST_CASE(MutexTest) { const std::string shm_name("ShmMutex"); @@ -104,12 +161,42 @@ const std::string mtx_name("test_mutex"); const std::string int_name("test_int"); auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(); auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(3s); auto pi = shm.find_or_construct<int>(int_name.c_str())(100); typedef std::chrono::steady_clock Clock; auto Now = []() { return Clock::now().time_since_epoch(); }; if (pi) { auto old = *pi; printf("int : %d, add1: %d\n", old, ++*pi); } { boost::timer::auto_cpu_timer timer; printf("test time: "); TLMutex mutex; // CasMutex mutex; auto Lock = [&]() { for (int i = 0; i < 1000 * 100; ++i) { mutex.lock(); mutex.unlock(); } }; std::thread t1(Lock), t2(Lock); t1.join(); t2.join(); printf("mutex nlock: %ld, update time error: %ld, normal fail: %ld, error wait: %ld\n", mutex.st_.nlock_, mutex.st_.nupdate_time_fail, mutex.st_.nfail, mutex.st_.nexcept); } auto MSFromNow = [](const int ms) { using namespace boost::posix_time; ptime cur = boost::posix_time::microsec_clock::universal_time(); return cur + millisec(ms); }; auto TryLock = [&]() { if (mtx->try_lock()) { @@ -128,10 +215,17 @@ if (mtx) { printf("mtx exists\n"); if (TryLock()) { if (TryLock()) { Unlock(); } auto op = [&]() { if (TryLock()) { Unlock(); } }; op(); std::thread t(op); t.join(); // Unlock(); } else { // mtx->unlock(); } } else { printf("mtx not exists\n"); utest/lock_free_queue.cpp
File was deleted utest/lock_free_queue.h
File was deleted