liuxiaolong
2021-07-20 58d904a328c0d849769b483e901a0be9426b8209
src/robust.h
@@ -19,236 +19,74 @@
#ifndef ROBUST_Q31RCWYU
#define ROBUST_Q31RCWYU
#include "bh_util.h"
#include "log.h"
#include <string.h>
#include <atomic>
#include <chrono>
#include <memory>
#include <string.h>
#include <string>
#include <sys/types.h>
#include <unistd.h>
namespace robust
{
using namespace std::chrono;
using namespace std::chrono_literals;
void QuickSleep();
class RobustReqRep
// atomic queue, length is 1.
// lowest bit is used for data flag, 63 bit for data.
class AtomicQ63
{
   typedef uint32_t State;
   typedef std::string Msg;
   typedef std::chrono::steady_clock::duration Duration;
public:
   enum ErrorCode {
      eSuccess = 0,
      eTimeout = EAGAIN,
      eSizeError = EINVAL,
   };
   explicit RobustReqRep(const uint32_t max_len) :
       capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {}
   void PutReady() { state_.store(eStateReady); }
   bool Ready() const { return state_.load() == eStateReady; }
   uint32_t capacity() const { return capacity_; }
   int ClientRequest(const Msg &request, Msg &reply)
   typedef int64_t Data;
   AtomicQ63() { memset(this, 0, sizeof(*this)); }
   bool push(const Data d, bool try_more = false)
   {
      int r = ClientWriteRequest(request);
      if (r == eSuccess) {
         r = ClientReadReply(reply);
      }
      auto cur = buf.load();
      return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d));
   }
   bool pop(Data &d, bool try_more = false)
   {
      Data cur = buf.load();
      bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0);
      if (r) { d = Dec(cur); }
      return r;
   }
   int ClientReadReply(Msg &reply);
   int ClientWriteRequest(const Msg &request);
   int ServerReadRequest(Msg &request);
   int ServerWriteReply(const Msg &reply);
private:
   RobustReqRep(const RobustReqRep &);
   RobustReqRep(RobustReqRep &&);
   RobustReqRep &operator=(const RobustReqRep &) = delete;
   RobustReqRep &operator=(RobustReqRep &&) = delete;
   static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
   static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
   static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
   enum {
      eStateInit = 0,
      eStateReady = 0x19833891,
      eClientWriteBegin,
      eClientWriteEnd,
      eServerReadBegin,
      eServerReadEnd,
      eServerWriteBegin,
      eServerWriteEnd,
      eClientReadBegin,
      eClientReadEnd = eStateReady,
   typedef std::atomic<Data> AData;
   // static_assert(sizeof(Data) == sizeof(AData));
   AData buf;
};
// atomic request-reply process, one cycle a time.
class AtomicReqRep
{
public:
   typedef int64_t Data;
   typedef std::function<Data(const Data)> Handler;
   bool ClientRequest(const Data request, Data &reply);
   bool ServerProcess(Handler onReq);
   AtomicReqRep() :
       data_(0), timestamp_(now()) {}
private:
   enum State {
      eStateFree,
      eStateRequest,
      eStateReply
   };
   bool StateCas(State exp, State val);
   void Write(const Msg &msg)
   {
      size_.store(msg.size());
      memcpy(buf, msg.data(), msg.size());
   }
   void Read(Msg &msg) { msg.assign(buf, size_.load()); }
   static int GetState(Data d) { return d & MaskBits(3); }
   static Data Encode(Data d, State st) { return (d << 3) | st; }
   static Data Decode(Data d) { return d >> 3; }
   typedef std::chrono::steady_clock steady_clock;
   typedef steady_clock::duration Duration;
   static Duration now() { return steady_clock::now().time_since_epoch(); }
   const uint32_t capacity_;
   std::atomic<State> state_;
   static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data.");
   bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
   std::atomic<Data> data_;
   std::atomic<Duration> timestamp_;
   std::atomic<int32_t> size_;
   char buf[4];
};
template <bool isRobust = false>
class CasMutex
{
   static pid_t pid()
   {
      static pid_t val = getpid();
      return val;
   }
   static bool Killed(pid_t pid)
   {
      char buf[64] = {0};
      snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
      return access(buf, F_OK) != 0;
   }
public:
   CasMutex() :
       meta_(0) {}
   int try_lock()
   {
      const auto t = steady_clock::now().time_since_epoch().count();
      auto old = meta_.load();
      int r = 0;
      if (!Locked(old)) {
         r = MetaCas(old, Meta(1, pid()));
      } else if (isRobust && Killed(Pid(old))) {
         r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1;
         if (r) {
            printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r);
         }
      }
      return r;
   }
   int lock()
   {
      int r = 0;
      do {
         r = try_lock();
      } while (r == 0);
      return r;
   }
   void unlock()
   {
      auto old = meta_.load();
      if (Locked(old) && Pid(old) == pid()) {
         MetaCas(old, Meta(0, pid()));
      }
   }
private:
   std::atomic<uint64_t> meta_;
   bool Locked(uint64_t meta) { return (meta >> 63) != 0; }
   pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); }
   uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; }
   bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
   static_assert(sizeof(pid_t) < sizeof(uint64_t));
};
template <class Lock>
class Guard
{
public:
   Guard(Lock &l) :
       l_(l) { l_.lock(); }
   ~Guard() { l_.unlock(); }
private:
   Guard(const Guard &);
   Guard(Guard &&);
   Lock &l_;
};
template <class D, class Alloc = std::allocator<D>>
class CircularBuffer
{
   typedef uint32_t size_type;
   typedef uint32_t count_type;
   typedef uint64_t meta_type;
   static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
   static count_type Count(meta_type meta) { return meta >> 32; }
   static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
public:
   typedef D Data;
   CircularBuffer(const size_type cap) :
       CircularBuffer(cap, Alloc()) {}
   CircularBuffer(const size_type cap, Alloc const &al) :
       state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap))
   {
      if (!buf) {
         throw("error allocate buffer: out of mem!");
      }
   }
   ~CircularBuffer()
   {
      al_.deallocate(buf, capacity_);
   }
   size_type size() const { return (capacity_ + tail() - head()) % capacity_; }
   bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; }
   bool empty() const { return head() == tail(); }
   bool push_back(Data d)
   {
      Guard<MutexT> guard(mutex_);
      if (!full()) {
         auto old = mtail();
         buf[Pos(old)] = d;
         return mtail_.compare_exchange_strong(old, next(old));
      } else {
         return false;
      }
   }
   bool pop_front(Data &d)
   {
      Guard<MutexT> guard(mutex_);
      if (!empty()) {
         auto old = mhead();
         d = buf[Pos(old)];
         return mhead_.compare_exchange_strong(old, next(old));
      } else {
         return false;
      }
   }
   bool Ready() const { return state_.load() == eStateReady; }
   void PutReady() { state_.store(eStateReady); }
private:
   CircularBuffer(const CircularBuffer &);
   CircularBuffer(CircularBuffer &&);
   CircularBuffer &operator=(const CircularBuffer &) = delete;
   CircularBuffer &operator=(CircularBuffer &&) = delete;
   typedef CasMutex<true> MutexT;
   // static_assert(sizeof(MutexT) == 16);
   meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
   size_type head() const { return Pos(mhead()); }
   size_type tail() const { return Pos(mtail()); }
   meta_type mhead() const { return mhead_.load(); }
   meta_type mtail() const { return mtail_.load(); }
   // data
   enum { eStateReady = 0x19833891 };
   std::atomic<uint32_t> state_;
   const size_type capacity_;
   MutexT mutex_;
   std::atomic<meta_type> mhead_;
   std::atomic<meta_type> mtail_;
   Alloc al_;
   typename Alloc::pointer buf = nullptr;
};
} // namespace robust