liuxiaolong
2021-07-20 58d904a328c0d849769b483e901a0be9426b8209
src/robust.h
@@ -21,234 +21,21 @@
#include "bh_util.h"
#include "log.h"
#include <string.h>
#include <atomic>
#include <chrono>
#include <memory>
#include <mutex>
#include <string>
#include <sys/file.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
namespace robust
{
using namespace std::chrono;
using namespace std::chrono_literals;
void QuickSleep();
class CasMutex
{
   typedef uint64_t locker_t;
   static inline locker_t this_locker() { return pthread_self(); }
   static const uint64_t kLockerMask = MaskBits(63);
public:
   CasMutex() :
       meta_(0) {}
   int try_lock()
   {
      auto old = meta_.load();
      int r = 0;
      if (!Locked(old)) {
         r = MetaCas(old, Meta(1, this_locker()));
      }
      return r;
   }
   int lock()
   {
      int r = 0;
      do {
         r = try_lock();
      } while (r == 0);
      return r;
   }
   void unlock()
   {
      auto old = meta_.load();
      if (Locked(old) && Locker(old) == this_locker()) {
         MetaCas(old, Meta(0, this_locker()));
      }
   }
private:
   std::atomic<uint64_t> meta_;
   bool Locked(uint64_t meta) { return (meta >> 63) == 1; }
   locker_t Locker(uint64_t meta) { return meta & kLockerMask; }
   uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; }
   bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
};
class NullMutex
// atomic queue, length is 1.
// lowest bit is used for data flag, 63 bit for data.
class AtomicQ63
{
public:
   template <class... T>
   explicit NullMutex(T &&...t) {} // easy test.
   bool try_lock() { return true; }
   void lock() {}
   void unlock() {}
};
// flock + mutex
class FMutex
{
public:
   typedef uint64_t id_t;
   FMutex(id_t id) :
       id_(id), fd_(Open(id_)), count_(0)
   {
      if (fd_ == -1) { throw "error create mutex!"; }
   }
   ~FMutex() { Close(fd_); }
   bool try_lock();
   void lock();
   void unlock();
private:
   static std::string GetPath(id_t id)
   {
      const std::string dir("/tmp/.bhome_mtx");
      mkdir(dir.c_str(), 0777);
      return dir + "/fm_" + std::to_string(id);
   }
   static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); }
   static int Close(int fd) { return close(fd); }
   id_t id_;
   int fd_;
   std::mutex mtx_;
   std::atomic<int32_t> count_;
};
union semun {
   int val;               /* Value for SETVAL */
   struct semid_ds *buf;  /* Buffer for IPC_STAT, IPC_SET */
   unsigned short *array; /* Array for GETALL, SETALL */
   struct seminfo *__buf; /* Buffer for IPC_INFO
                                           (Linux-specific) */
};
class SemMutex
{
public:
   SemMutex(key_t key) :
       key_(key), sem_id_(semget(key, 1, 0666))
   {
      if (sem_id_ == -1) { throw "error create semaphore."; }
   }
   ~SemMutex() {}
   bool try_lock()
   {
      sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT};
      return semop(sem_id_, &op, 1) == 0;
   }
   void lock()
   {
      sembuf op = {0, -1, SEM_UNDO};
      semop(sem_id_, &op, 1);
   }
   void unlock()
   {
      sembuf op = {0, 1, SEM_UNDO};
      semop(sem_id_, &op, 1);
   }
private:
   key_t key_;
   int sem_id_;
};
template <class Lock>
class Guard
{
public:
   Guard(Lock &l) :
       l_(l) { l_.lock(); }
   ~Guard() { l_.unlock(); }
private:
   Guard(const Guard &);
   Guard(Guard &&);
   Lock &l_;
};
template <unsigned PowerSize = 4, class Int = int64_t>
class AtomicQueue
{
public:
   typedef uint32_t size_type;
   typedef Int Data;
   typedef std::atomic<Data> AData;
   static_assert(sizeof(Data) == sizeof(AData));
   enum {
      power = PowerSize,
      capacity = (1 << power),
      mask = capacity - 1,
   };
   AtomicQueue() { memset(this, 0, sizeof(*this)); }
   size_type head() const { return head_.load(); }
   size_type tail() const { return tail_.load(); }
   bool like_empty() const { return head() == tail() && Empty(buf[head()]); }
   bool like_full() const { return head() == tail() && !Empty(buf[head()]); }
   bool push(const Data d, bool try_more = false)
   {
      bool r = false;
      size_type i = 0;
      do {
         auto pos = tail();
         if (tail_.compare_exchange_strong(pos, Next(pos))) {
            auto cur = buf[pos].load();
            r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d));
         }
      } while (try_more && !r && ++i < capacity);
      return r;
   }
   bool pop(Data &d, bool try_more = false)
   {
      bool r = false;
      Data cur;
      size_type i = 0;
      do {
         auto pos = head();
         if (head_.compare_exchange_strong(pos, Next(pos))) {
            cur = buf[pos].load();
            r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0);
         }
      } while (try_more && !r && ++i < capacity);
      if (r) { d = Dec(cur); }
      return r;
   }
private:
   static_assert(std::is_integral<Data>::value, "Data must be integral type!");
   static_assert(std::is_signed<Data>::value, "Data must be signed type!");
   static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!");
   static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
   static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
   static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
   static size_type Next(const size_type index) { return (index + 1) & mask; }
   std::atomic<size_type> head_;
   std::atomic<size_type> tail_;
   AData buf[capacity];
};
template <class Int>
class AtomicQueue<0, Int>
{
   typedef Int Data;
   typedef std::atomic<Data> AData;
   static_assert(sizeof(Data) == sizeof(AData));
public:
   AtomicQueue() { memset(this, 0, sizeof(*this)); }
   typedef int64_t Data;
   AtomicQ63() { memset(this, 0, sizeof(*this)); }
   bool push(const Data d, bool try_more = false)
   {
      auto cur = buf.load();
@@ -261,63 +48,28 @@
      if (r) { d = Dec(cur); }
      return r;
   }
   uint32_t head() const { return 0; }
   uint32_t tail() const { return 0; }
private:
   static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
   static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
   static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
   typedef std::atomic<Data> AData;
   // static_assert(sizeof(Data) == sizeof(AData));
   AData buf;
};
// atomic request-reply process, one cycle a time.
class AtomicReqRep
{
public:
   typedef int64_t Data;
   typedef std::function<Data(const Data)> Handler;
   bool ClientRequest(const Data request, Data &reply)
   {
      auto end_time = now() + 3s;
      do {
         Data cur = data_.load();
         if (GetState(cur) == eStateFree &&
             DataCas(cur, Encode(request, eStateRequest))) {
            do {
               yield();
               cur = data_.load();
               if (GetState(cur) == eStateReply) {
                  DataCas(cur, Encode(0, eStateFree));
                  reply = Decode(cur);
                  return true;
               }
            } while (now() < end_time);
         }
         yield();
      } while (now() < end_time);
      return false;
   }
   bool ServerProcess(Handler onReq)
   {
      Data cur = data_.load();
      switch (GetState(cur)) {
      case eStateRequest:
         if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) {
            timestamp_ = now();
            return true;
         }
         break;
      case eStateReply:
         if (timestamp_.load() + 3s < now()) {
            DataCas(cur, Encode(0, eStateFree));
         }
         break;
      case eStateFree:
      default: break;
      }
      return false;
   }
   bool ClientRequest(const Data request, Data &reply);
   bool ServerProcess(Handler onReq);
   AtomicReqRep() :
       data_(0), timestamp_(now()) {}
private:
   enum State {
@@ -328,9 +80,9 @@
   static int GetState(Data d) { return d & MaskBits(3); }
   static Data Encode(Data d, State st) { return (d << 3) | st; }
   static Data Decode(Data d) { return d >> 3; }
   static void yield() { QuickSleep(); }
   typedef std::chrono::steady_clock steady_clock;
   typedef steady_clock::duration Duration;
   Duration now() { return steady_clock::now().time_since_epoch(); }
   static Duration now() { return steady_clock::now().time_since_epoch(); }
   bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
   std::atomic<Data> data_;