lichao
2021-04-29 72bffb0807925a156b076b71f78c848a08d27b87
src/robust.h
@@ -32,6 +32,7 @@
using namespace std::chrono;
using namespace std::chrono_literals;
constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
void QuickSleep();
@@ -102,20 +103,41 @@
   char buf[4];
};
template <bool isRobust = false>
class CasMutex
class PidLocker
{
   static pid_t pid()
public:
   typedef int locker_t;
   static locker_t this_locker()
   {
      static pid_t val = getpid();
      static locker_t val = getpid();
      return val;
   }
   static bool Killed(pid_t pid)
   static bool is_alive(locker_t locker) { return true; }
};
class RobustPidLocker
{
public:
   typedef int locker_t;
   static locker_t this_locker()
   {
      static locker_t val = getpid();
      return val;
   }
   static bool is_alive(locker_t locker)
   {
      char buf[64] = {0};
      snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
      return access(buf, F_OK) != 0;
      snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker);
      return access(buf, F_OK) == 0;
   }
};
template <class LockerT>
class CasMutex
{
   typedef typename LockerT::locker_t locker_t;
   static inline locker_t this_locker() { return LockerT::this_locker(); }
   static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
public:
   CasMutex() :
@@ -126,11 +148,11 @@
      auto old = meta_.load();
      int r = 0;
      if (!Locked(old)) {
         r = MetaCas(old, Meta(1, pid()));
      } else if (isRobust && Killed(Pid(old))) {
         r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1;
         r = MetaCas(old, Meta(1, this_locker()));
      } else if (!is_alive(Locker(old))) {
         r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
         if (r) {
            printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r);
            printf("captured pid %d -> %d, r = %d\n", Locker(old), this_locker(), r);
         }
      }
      return r;
@@ -146,19 +168,21 @@
   void unlock()
   {
      auto old = meta_.load();
      if (Locked(old) && Pid(old) == pid()) {
         MetaCas(old, Meta(0, pid()));
      if (Locked(old) && Locker(old) == this_locker()) {
         MetaCas(old, Meta(0, this_locker()));
      }
   }
private:
   static_assert(sizeof(locker_t) < sizeof(uint64_t), "locker size must be smaller than 64 bit!");
   std::atomic<uint64_t> meta_;
   bool Locked(uint64_t meta) { return (meta >> 63) != 0; }
   pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); }
   uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; }
   locker_t Locker(uint64_t meta) { return meta & MaskBits(sizeof(locker_t) * 8); }
   uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; }
   bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
   static_assert(sizeof(pid_t) < sizeof(uint64_t));
};
typedef CasMutex<RobustPidLocker> Mutex;
template <class Lock>
class Guard
@@ -193,19 +217,17 @@
       state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap))
   {
      if (!buf) {
         throw("error allocate buffer: out of mem!");
         throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
      }
   }
   ~CircularBuffer()
   {
      al_.deallocate(buf, capacity_);
   }
   ~CircularBuffer() { al_.deallocate(buf, capacity_); }
   size_type size() const { return (capacity_ + tail() - head()) % capacity_; }
   bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; }
   bool empty() const { return head() == tail(); }
   bool push_back(Data d)
   {
      Guard<MutexT> guard(mutex_);
      Guard<Mutex> guard(mutex_);
      if (!full()) {
         auto old = mtail();
         buf[Pos(old)] = d;
@@ -216,7 +238,7 @@
   }
   bool pop_front(Data &d)
   {
      Guard<MutexT> guard(mutex_);
      Guard<Mutex> guard(mutex_);
      if (!empty()) {
         auto old = mhead();
         d = buf[Pos(old)];
@@ -233,8 +255,7 @@
   CircularBuffer(CircularBuffer &&);
   CircularBuffer &operator=(const CircularBuffer &) = delete;
   CircularBuffer &operator=(CircularBuffer &&) = delete;
   typedef CasMutex<true> MutexT;
   // static_assert(sizeof(MutexT) == 16);
   meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
   size_type head() const { return Pos(mhead()); }
   size_type tail() const { return Pos(mtail()); }
@@ -244,7 +265,7 @@
   enum { eStateReady = 0x19833891 };
   std::atomic<uint32_t> state_;
   const size_type capacity_;
   MutexT mutex_;
   Mutex mutex_;
   std::atomic<meta_type> mhead_;
   std::atomic<meta_type> mtail_;
   Alloc al_;