lichao
2021-05-19 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3
src/robust.h
@@ -19,6 +19,7 @@
#ifndef ROBUST_Q31RCWYU
#define ROBUST_Q31RCWYU
#include "bh_util.h"
#include "log.h"
#include <atomic>
#include <chrono>
@@ -37,8 +38,6 @@
using namespace std::chrono;
using namespace std::chrono_literals;
constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
void QuickSleep();
class CasMutex
@@ -86,6 +85,8 @@
class NullMutex
{
public:
   template <class... T>
   explicit NullMutex(T &&...t) {} // easy test.
   bool try_lock() { return true; }
   void lock() {}
   void unlock() {}
@@ -97,7 +98,7 @@
public:
   typedef uint64_t id_t;
   FMutex(id_t id) :
       id_(id), fd_(Open(id_))
       id_(id), fd_(Open(id_)), count_(0)
   {
      if (fd_ == -1) { throw "error create mutex!"; }
   }
@@ -113,11 +114,12 @@
      mkdir(dir.c_str(), 0777);
      return dir + "/fm_" + std::to_string(id);
   }
   static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDWR, 0666); }
   static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); }
   static int Close(int fd) { return close(fd); }
   id_t id_;
   int fd_;
   std::mutex mtx_;
   std::atomic<int32_t> count_;
};
union semun {
@@ -132,17 +134,11 @@
{
public:
   SemMutex(key_t key) :
       key_(key), sem_id_(semget(key, 1, 0666 | IPC_CREAT))
       key_(key), sem_id_(semget(key, 1, 0666))
   {
      if (sem_id_ == -1) { throw "error create semaphore."; }
      union semun init_val;
      init_val.val = 1;
      semctl(sem_id_, 0, SETVAL, init_val);
   }
   ~SemMutex()
   {
      // semctl(sem_id_, 0, IPC_RMID, semun{});
   }
   ~SemMutex() {}
   bool try_lock()
   {
@@ -179,74 +175,6 @@
   Guard(const Guard &);
   Guard(Guard &&);
   Lock &l_;
};
template <class D, class Alloc = std::allocator<D>>
class CircularBuffer
{
   typedef uint32_t size_type;
   typedef uint32_t count_type;
   typedef uint64_t meta_type;
   static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
   static count_type Count(meta_type meta) { return meta >> 32; }
   static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
public:
   typedef D Data;
   CircularBuffer(const size_type cap) :
       CircularBuffer(cap, Alloc()) {}
   CircularBuffer(const size_type cap, Alloc const &al) :
       capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_))
   {
      if (!buf) {
         throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
      } else {
         memset(&buf[0], 0, sizeof(D) * capacity_);
      }
   }
   ~CircularBuffer() { al_.deallocate(buf, capacity_); }
   bool push_back(const Data d)
   {
      auto old = mtail();
      auto pos = Pos(old);
      auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
      if (!full) {
         buf[pos] = d;
         return mtail_.compare_exchange_strong(old, next(old));
      }
      return false;
   }
   bool pop_front(Data &d)
   {
      auto old = mhead();
      auto pos = Pos(old);
      if (!(pos == tail())) {
         d = buf[pos];
         return mhead_.compare_exchange_strong(old, next(old));
      } else {
         return false;
      }
   }
private:
   CircularBuffer(const CircularBuffer &);
   CircularBuffer(CircularBuffer &&);
   CircularBuffer &operator=(const CircularBuffer &) = delete;
   CircularBuffer &operator=(CircularBuffer &&) = delete;
   meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
   size_type head() const { return Pos(mhead()); }
   size_type tail() const { return Pos(mtail()); }
   meta_type mhead() const { return mhead_.load(); }
   meta_type mtail() const { return mtail_.load(); }
   // data
   const size_type capacity_;
   std::atomic<meta_type> mhead_;
   std::atomic<meta_type> mtail_;
   Alloc al_;
   typename Alloc::pointer buf = nullptr;
};
template <unsigned PowerSize = 4, class Int = int64_t>
@@ -312,5 +240,102 @@
   AData buf[capacity];
};
template <class Int>
class AtomicQueue<0, Int>
{
   typedef Int Data;
   typedef std::atomic<Data> AData;
   static_assert(sizeof(Data) == sizeof(AData));
public:
   AtomicQueue() { memset(this, 0, sizeof(*this)); }
   bool push(const Data d, bool try_more = false)
   {
      auto cur = buf.load();
      return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d));
   }
   bool pop(Data &d, bool try_more = false)
   {
      Data cur = buf.load();
      bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0);
      if (r) { d = Dec(cur); }
      return r;
   }
   uint32_t head() const { return 0; }
   uint32_t tail() const { return 0; }
private:
   static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
   static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
   static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
   AData buf;
};
class AtomicReqRep
{
public:
   typedef int64_t Data;
   typedef std::function<Data(const Data)> Handler;
   bool ClientRequest(const Data request, Data &reply)
   {
      auto end_time = now() + 3s;
      do {
         Data cur = data_.load();
         if (GetState(cur) == eStateFree &&
             DataCas(cur, Encode(request, eStateRequest))) {
            do {
               yield();
               cur = data_.load();
               if (GetState(cur) == eStateReply) {
                  DataCas(cur, Encode(0, eStateFree));
                  reply = Decode(cur);
                  return true;
               }
            } while (now() < end_time);
         }
         yield();
      } while (now() < end_time);
      return false;
   }
   bool ServerProcess(Handler onReq)
   {
      Data cur = data_.load();
      switch (GetState(cur)) {
      case eStateRequest:
         if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) {
            timestamp_ = now();
            return true;
         }
         break;
      case eStateReply:
         if (timestamp_.load() + 3s < now()) {
            DataCas(cur, Encode(0, eStateFree));
         }
         break;
      case eStateFree:
      default: break;
      }
      return false;
   }
private:
   enum State {
      eStateFree,
      eStateRequest,
      eStateReply
   };
   static int GetState(Data d) { return d & MaskBits(3); }
   static Data Encode(Data d, State st) { return (d << 3) | st; }
   static Data Decode(Data d) { return d >> 3; }
   static void yield() { QuickSleep(); }
   typedef steady_clock::duration Duration;
   Duration now() { return steady_clock::now().time_since_epoch(); }
   bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
   std::atomic<Data> data_;
   std::atomic<Duration> timestamp_;
};
} // namespace robust
#endif // end of include guard: ROBUST_Q31RCWYU