lichao
2021-04-30 d33a69463f1a75134d01191be0b9e1bdd757dd4b
src/robust.h
@@ -107,6 +107,7 @@
{
public:
   typedef int locker_t;
   enum { eLockerBits = sizeof(locker_t) * 8 };
   static locker_t this_locker()
   {
      static locker_t val = getpid();
@@ -119,6 +120,7 @@
{
public:
   typedef int locker_t;
   enum { eLockerBits = sizeof(locker_t) * 8 };
   static locker_t this_locker()
   {
      static locker_t val = getpid();
@@ -132,12 +134,29 @@
   }
};
class ExpiredLocker
{
public:
   typedef int64_t locker_t;
   enum { eLockerBits = 63 };
   static locker_t this_locker() { return Now(); }
   static bool is_alive(locker_t locker)
   {
      return Now() < locker + steady_clock::duration(10s).count();
   }
private:
   static locker_t Now() { return steady_clock::now().time_since_epoch().count(); }
};
template <class LockerT>
class CasMutex
{
   typedef typename LockerT::locker_t locker_t;
   static inline locker_t this_locker() { return LockerT::this_locker(); }
   static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
   static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits);
   static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!");
public:
   CasMutex() :
@@ -152,7 +171,7 @@
      } else if (!is_alive(Locker(old))) {
         r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
         if (r) {
            printf("captured pid %d -> %d, r = %d\n", Locker(old), this_locker(), r);
            printf("captured locker %ld -> %ld, locker = %d\n", int64_t(Locker(old)), int64_t(this_locker()), r);
         }
      }
      return r;
@@ -174,10 +193,9 @@
   }
private:
   static_assert(sizeof(locker_t) < sizeof(uint64_t), "locker size must be smaller than 64 bit!");
   std::atomic<uint64_t> meta_;
   bool Locked(uint64_t meta) { return (meta >> 63) != 0; }
   locker_t Locker(uint64_t meta) { return meta & MaskBits(sizeof(locker_t) * 8); }
   bool Locked(uint64_t meta) { return (meta >> 63) == 1; }
   locker_t Locker(uint64_t meta) { return meta & kLockerMask; }
   uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; }
   bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
};
@@ -206,7 +224,7 @@
   typedef uint64_t meta_type;
   static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
   static count_type Count(meta_type meta) { return meta >> 32; }
   static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
   static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
public:
   typedef D Data;
@@ -214,41 +232,40 @@
   CircularBuffer(const size_type cap) :
       CircularBuffer(cap, Alloc()) {}
   CircularBuffer(const size_type cap, Alloc const &al) :
       state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap))
       capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_))
   {
      if (!buf) {
         throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
      } else {
         memset(&buf[0], 0, sizeof(D) * capacity_);
      }
   }
   ~CircularBuffer() { al_.deallocate(buf, capacity_); }
   size_type size() const { return (capacity_ + tail() - head()) % capacity_; }
   bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; }
   bool empty() const { return head() == tail(); }
   bool push_back(Data d)
   bool push_back(const Data d)
   {
      Guard<Mutex> guard(mutex_);
      if (!full()) {
         auto old = mtail();
         buf[Pos(old)] = d;
      auto old = mtail();
      auto pos = Pos(old);
      auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
      if (!full) {
         buf[pos] = d;
         return mtail_.compare_exchange_strong(old, next(old));
      } else {
         return false;
      }
      return false;
   }
   bool pop_front(Data &d)
   {
      Guard<Mutex> guard(mutex_);
      if (!empty()) {
         auto old = mhead();
         d = buf[Pos(old)];
      auto old = mhead();
      auto pos = Pos(old);
      if (!(pos == tail())) {
         d = buf[pos];
         return mhead_.compare_exchange_strong(old, next(old));
      } else {
         return false;
      }
   }
   bool Ready() const { return state_.load() == eStateReady; }
   void PutReady() { state_.store(eStateReady); }
private:
   CircularBuffer(const CircularBuffer &);
@@ -262,8 +279,6 @@
   meta_type mhead() const { return mhead_.load(); }
   meta_type mtail() const { return mtail_.load(); }
   // data
   enum { eStateReady = 0x19833891 };
   std::atomic<uint32_t> state_;
   const size_type capacity_;
   Mutex mutex_;
   std::atomic<meta_type> mhead_;
@@ -272,5 +287,66 @@
   typename Alloc::pointer buf = nullptr;
};
template <unsigned PowerSize = 4, class Int = int64_t>
class AtomicQueue
{
public:
   typedef uint32_t size_type;
   typedef Int Data;
   typedef std::atomic<Data> AData;
   static_assert(sizeof(Data) == sizeof(AData));
   enum {
      power = PowerSize,
      capacity = (1 << power),
      mask = capacity - 1,
   };
   AtomicQueue() { memset(this, 0, sizeof(*this)); }
   size_type head() const { return head_.load(); }
   size_type tail() const { return tail_.load(); }
   bool like_empty() const { return head() == tail() && Empty(buf[head()]); }
   bool like_full() const { return head() == tail() && !Empty(buf[head()]); }
   bool push_back(const Data d, bool try_more = false)
   {
      bool r = false;
      size_type i = 0;
      do {
         auto pos = tail();
         auto cur = buf[pos].load();
         r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d));
         tail_.compare_exchange_strong(pos, Next(pos));
      } while (try_more && !r && ++i < capacity);
      return r;
   }
   bool pop_front(Data &d, bool try_more = false)
   {
      bool r = false;
      Data cur;
      size_type i = 0;
      do {
         auto pos = head();
         cur = buf[pos].load();
         r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0);
         head_.compare_exchange_strong(pos, Next(pos));
      } while (try_more && !r && ++i < capacity);
      if (r) { d = Dec(cur); }
      return r;
   }
private:
   static_assert(std::is_integral<Data>::value, "Data must be integral type!");
   static_assert(std::is_signed<Data>::value, "Data must be signed type!");
   static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!");
   static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
   static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
   static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
   static size_type Next(const size_type index) { return (index + 1) & mask; }
   std::atomic<size_type> head_;
   std::atomic<size_type> tail_;
   AData buf[capacity];
};
} // namespace robust
#endif // end of include guard: ROBUST_Q31RCWYU