lichao
2021-04-28 a6f67b4249525089fb97eb9418c7014f66c2a000
utest/api_test.cpp
@@ -16,6 +16,7 @@
 * =====================================================================================
 */
#include "bh_api.h"
#include "robust.h"
#include "util.h"
#include <atomic>
#include <boost/lockfree/queue.hpp>
@@ -96,138 +97,22 @@
   // printf("client Recv reply : %s\n", reply.data().c_str());
}
class TLMutex
{
   typedef boost::interprocess::interprocess_mutex MutexT;
   // typedef CasMutex MutexT;
   // typedef std::mutex MutexT;
   typedef std::chrono::steady_clock Clock;
   typedef Clock::duration Duration;
   static Duration Now() { return Clock::now().time_since_epoch(); }
   const Duration limit_;
   std::atomic<Duration> last_lock_time_;
   MutexT mutex_;
   bool Expired(const Duration diff) { return diff > limit_; }
public:
   struct Status {
      int64_t nlock_ = 0;
      int64_t nupdate_time_fail = 0;
      int64_t nfail = 0;
      int64_t nexcept = 0;
   };
   Status st_;
   explicit TLMutex(Duration limit) :
       limit_(limit) {}
   TLMutex() :
       TLMutex(std::chrono::seconds(1)) {}
   ~TLMutex() { static_assert(std::is_pod<Duration>::value); }
   bool try_lock()
   {
      if (mutex_.try_lock()) {
         auto old_time = last_lock_time_.load();
         auto cur = Now();
         if (Expired(cur - old_time)) {
            return last_lock_time_.compare_exchange_strong(old_time, cur);
         } else {
            last_lock_time_.store(Now());
            return true;
         }
      } else {
         auto old_time = last_lock_time_.load();
         auto cur = Now();
         if (Expired(cur - old_time)) {
            return last_lock_time_.compare_exchange_strong(old_time, cur);
         } else {
            return false;
         }
      }
   }
   void lock()
   {
      int n = 0;
      while (!try_lock()) {
         n++;
         std::this_thread::yield();
      }
      st_.nlock_ += n;
   }
   void unlock()
   {
      auto old_time = last_lock_time_.load();
      auto cur = Now();
      if (!Expired(cur - old_time)) {
         if (last_lock_time_.compare_exchange_strong(old_time, cur)) {
            mutex_.unlock();
         }
      }
   }
};
//robust attr does NOT work, maybe os does not support it.
class RobustMutex
{
public:
   RobustMutex()
   {
      pthread_mutexattr_t mutex_attr;
      auto attr = [&]() { return &mutex_attr; };
      int r = pthread_mutexattr_init(attr());
      r |= pthread_mutexattr_setpshared(attr(), PTHREAD_PROCESS_SHARED);
      r |= pthread_mutexattr_setrobust_np(attr(), PTHREAD_MUTEX_ROBUST_NP);
      r |= pthread_mutex_init(mtx(), attr());
      int rob = 0;
      pthread_mutexattr_getrobust_np(attr(), &rob);
      int shared = 0;
      pthread_mutexattr_getpshared(attr(), &shared);
      printf("robust : %d, shared : %d\n", rob, shared);
      r |= pthread_mutexattr_destroy(attr());
      if (r) {
         throw("init mutex error.");
      }
   }
   ~RobustMutex()
   {
      pthread_mutex_destroy(mtx());
   }
public:
   void lock() { Lock(); }
   bool try_lock()
   {
      int r = TryLock();
      printf("TryLock ret: %d\n", r);
      return r == 0;
   }
   void unlock() { Unlock(); }
   // private:
   int TryLock() { return pthread_mutex_trylock(mtx()); }
   int Lock() { return pthread_mutex_lock(mtx()); }
   int Unlock() { return pthread_mutex_unlock(mtx()); }
private:
   pthread_mutex_t *mtx() { return &mutex_; }
   pthread_mutex_t mutex_;
};
class LockFreeQueue
{
   typedef int64_t Data;
   typedef boost::lockfree::queue<Data, boost::lockfree::capacity<1024>> LFQueue;
   void push_back(Data d) { queue_.push(d); }
private:
   LFQueue queue_;
};
} // namespace
#include <chrono>
using namespace std::chrono;
// using namespace std::chrono_literals;
BOOST_AUTO_TEST_CASE(MutexTest)
{
   // typedef robust::CasMutex<true> RobustMutex;
   typedef MutexWithPidCheck RobustMutex;
   for (int i = 0; i < 20; ++i) {
      int size = i;
      int left = size & 7;
      int rsize = size + ((8 - left) & 7);
      printf("size: %3d, rsize: %3d\n", size, rsize);
   }
   SharedMemory &shm = TestShm();
   // shm.Remove();
   // return;
@@ -235,7 +120,7 @@
   const std::string mtx_name("test_mutex");
   const std::string int_name("test_int");
   auto mtx = shm.FindOrCreate<TLMutex>(mtx_name);
   auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
   auto pi = shm.FindOrCreate<int>(int_name, 100);
   std::mutex m;
@@ -248,11 +133,11 @@
   {
      boost::timer::auto_cpu_timer timer;
      printf("test time: ");
      TLMutex mutex;
      // CasMutex mutex;
      const int ntimes = 1000 * 1000;
      printf("test lock/unlock %d times: ", ntimes);
      RobustMutex mutex;
      auto Lock = [&]() {
         for (int i = 0; i < 10; ++i) {
         for (int i = 0; i < ntimes; ++i) {
            mutex.lock();
            mutex.unlock();
         }
@@ -260,11 +145,6 @@
      std::thread t1(Lock), t2(Lock);
      t1.join();
      t2.join();
      printf("mutex nlock: %ld, update time error: %ld, normal fail: %ld, error wait: %ld\n",
             mutex.st_.nlock_,
             mutex.st_.nupdate_time_fail,
             mutex.st_.nfail,
             mutex.st_.nexcept);
   }
   auto MSFromNow = [](const int ms) {
@@ -487,10 +367,13 @@
   threads.Launch(hb, &run);
   threads.Launch(showStatus, &run);
   int ncli = 10;
   const uint64_t nreq = 1000 * 10;
   const uint64_t nreq = 1000 * 100;
   for (int i = 0; i < ncli; ++i) {
      threads.Launch(asyncRequest, nreq);
   }
   // for (int i = 0; i < 100; ++i) {
   //    SyncRequest(0);
   // }
   int same = 0;
   int64_t last = 0;
@@ -509,4 +392,6 @@
   threads.WaitAll();
   auto &st = Status();
   printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load());
   BHCleanup();
   printf("after cleanup\n");
}