From 1b167ec5ad101ac44451381e26cc73ab5d67d2a1 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 26 四月 2021 16:37:52 +0800 Subject: [PATCH] fix socket busy loop; del locked readall; refactor. --- utest/api_test.cpp | 102 ++++++++++++++++++++++++++++++++------------------ 1 files changed, 65 insertions(+), 37 deletions(-) diff --git a/utest/api_test.cpp b/utest/api_test.cpp index debe8ad..6682aaf 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -18,6 +18,7 @@ #include "bh_api.h" #include "util.h" #include <atomic> +#include <boost/lockfree/queue.hpp> using namespace bhome_msg; @@ -49,7 +50,6 @@ static MsgStatus st; return st; } -} // namespace void SubRecvProc(const void *proc_id, const int proc_id_len, @@ -59,7 +59,7 @@ std::string proc((const char *) proc_id, proc_id_len); MsgPublish pub; pub.ParseFromArray(data, data_len); - // printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str()); + printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str()); } void ServerProc(const void *proc_id, @@ -98,8 +98,8 @@ class TLMutex { - // typedef boost::interprocess::interprocess_mutex MutexT; - typedef CasMutex MutexT; + typedef boost::interprocess::interprocess_mutex MutexT; + // typedef CasMutex MutexT; // typedef std::mutex MutexT; typedef std::chrono::steady_clock Clock; typedef Clock::duration Duration; @@ -108,6 +108,7 @@ const Duration limit_; std::atomic<Duration> last_lock_time_; MutexT mutex_; + bool Expired(const Duration diff) { return diff > limit_; } public: struct Status { @@ -127,16 +128,18 @@ { if (mutex_.try_lock()) { auto old_time = last_lock_time_.load(); - if (Now() - old_time > limit_) { - return last_lock_time_.compare_exchange_strong(old_time, Now()); + auto cur = Now(); + if (Expired(cur - old_time)) { + return last_lock_time_.compare_exchange_strong(old_time, cur); } else { last_lock_time_.store(Now()); return true; } } else { auto old_time = last_lock_time_.load(); - if (Now() - old_time > limit_) { - return last_lock_time_.compare_exchange_strong(old_time, Now()); + auto cur = Now(); + if (Expired(cur - old_time)) { + return last_lock_time_.compare_exchange_strong(old_time, cur); } else { return false; } @@ -154,55 +157,88 @@ void unlock() { auto old_time = last_lock_time_.load(); - if (Now() - old_time > limit_) { - } else { - if (last_lock_time_.compare_exchange_strong(old_time, Now())) { + auto cur = Now(); + if (!Expired(cur - old_time)) { + if (last_lock_time_.compare_exchange_strong(old_time, cur)) { mutex_.unlock(); } } } }; -namespace -{ -typedef int64_t Offset; -Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); } -void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); } -} // namespace - +//robust attr does NOT work, maybe os does not support it. class RobustMutex { public: RobustMutex() { - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_setrobust(&attr, 1); - pthread_mutex_init(mtx(), &attr); - if (!valid()) { + pthread_mutexattr_t mutex_attr; + auto attr = [&]() { return &mutex_attr; }; + int r = pthread_mutexattr_init(attr()); + r |= pthread_mutexattr_setpshared(attr(), PTHREAD_PROCESS_SHARED); + r |= pthread_mutexattr_setrobust_np(attr(), PTHREAD_MUTEX_ROBUST_NP); + r |= pthread_mutex_init(mtx(), attr()); + int rob = 0; + pthread_mutexattr_getrobust_np(attr(), &rob); + int shared = 0; + pthread_mutexattr_getpshared(attr(), &shared); + printf("robust : %d, shared : %d\n", rob, shared); + r |= pthread_mutexattr_destroy(attr()); + if (r) { throw("init mutex error."); } } + ~RobustMutex() + { + pthread_mutex_destroy(mtx()); + } + +public: + void lock() { Lock(); } + bool try_lock() + { + int r = TryLock(); + printf("TryLock ret: %d\n", r); + return r == 0; + } + + void unlock() { Unlock(); } + + // private: int TryLock() { return pthread_mutex_trylock(mtx()); } int Lock() { return pthread_mutex_lock(mtx()); } int Unlock() { return pthread_mutex_unlock(mtx()); } - bool valid() const { return false; } private: pthread_mutex_t *mtx() { return &mutex_; } pthread_mutex_t mutex_; }; +class LockFreeQueue +{ + typedef int64_t Data; + typedef boost::lockfree::queue<Data, boost::lockfree::capacity<1024>> LFQueue; + void push_back(Data d) { queue_.push(d); } + +private: + LFQueue queue_; +}; + +} // namespace + BOOST_AUTO_TEST_CASE(MutexTest) { SharedMemory &shm = TestShm(); + // shm.Remove(); + // return; GlobalInit(shm); const std::string mtx_name("test_mutex"); const std::string int_name("test_int"); - auto mtx = shm.FindOrCreate<Mutex>(mtx_name); + auto mtx = shm.FindOrCreate<TLMutex>(mtx_name); auto pi = shm.FindOrCreate<int>(int_name, 100); + std::mutex m; typedef std::chrono::steady_clock Clock; auto Now = []() { return Clock::now().time_since_epoch(); }; if (pi) { @@ -334,7 +370,6 @@ printf("subscribe topic : %s\n", r ? "ok" : "failed"); } - // BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); auto ServerLoop = [&](std::atomic<bool> *run) { while (*run) { void *proc_id = 0; @@ -446,27 +481,20 @@ std::atomic<bool> run(true); + BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); ThreadManager threads; boost::timer::auto_cpu_timer timer; threads.Launch(hb, &run); - threads.Launch(ServerLoop, &run); threads.Launch(showStatus, &run); int ncli = 10; - const uint64_t nreq = 1000 * 1; + const uint64_t nreq = 1000 * 10; for (int i = 0; i < ncli; ++i) { - // threads.Launch(asyncRequest, nreq); + threads.Launch(asyncRequest, nreq); } - - for (int i = 0; i < 10; ++i) { - SyncRequest(i); - } - // run.store(false); - // server_thread.join(); - // return; int same = 0; int64_t last = 0; - while (last < nreq * ncli && same < 1) { + while (last < nreq * ncli && same < 2) { Sleep(1s, false); auto cur = Status().nreply_.load(); if (last == cur) { -- Gitblit v1.8.0