lichao
2021-05-10 77a6c3512a44dfe6540dde71946e6484fe4f173f
src/robust.h
@@ -19,11 +19,16 @@
#ifndef ROBUST_Q31RCWYU
#define ROBUST_Q31RCWYU
#include "log.h"
#include <atomic>
#include <chrono>
#include <memory>
#include <string.h>
#include <mutex>
#include <string>
#include <sys/file.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
@@ -36,143 +41,21 @@
void QuickSleep();
class RobustReqRep
{
   typedef uint32_t State;
   typedef std::string Msg;
   typedef std::chrono::steady_clock::duration Duration;
public:
   enum ErrorCode {
      eSuccess = 0,
      eTimeout = EAGAIN,
      eSizeError = EINVAL,
   };
   explicit RobustReqRep(const uint32_t max_len) :
       capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {}
   void PutReady() { state_.store(eStateReady); }
   bool Ready() const { return state_.load() == eStateReady; }
   uint32_t capacity() const { return capacity_; }
   int ClientRequest(const Msg &request, Msg &reply)
   {
      int r = ClientWriteRequest(request);
      if (r == eSuccess) {
         r = ClientReadReply(reply);
      }
      return r;
   }
   int ClientReadReply(Msg &reply);
   int ClientWriteRequest(const Msg &request);
   int ServerReadRequest(Msg &request);
   int ServerWriteReply(const Msg &reply);
private:
   RobustReqRep(const RobustReqRep &);
   RobustReqRep(RobustReqRep &&);
   RobustReqRep &operator=(const RobustReqRep &) = delete;
   RobustReqRep &operator=(RobustReqRep &&) = delete;
   enum {
      eStateInit = 0,
      eStateReady = 0x19833891,
      eClientWriteBegin,
      eClientWriteEnd,
      eServerReadBegin,
      eServerReadEnd,
      eServerWriteBegin,
      eServerWriteEnd,
      eClientReadBegin,
      eClientReadEnd = eStateReady,
   };
   bool StateCas(State exp, State val);
   void Write(const Msg &msg)
   {
      size_.store(msg.size());
      memcpy(buf, msg.data(), msg.size());
   }
   void Read(Msg &msg) { msg.assign(buf, size_.load()); }
   const uint32_t capacity_;
   std::atomic<State> state_;
   static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data.");
   std::atomic<Duration> timestamp_;
   std::atomic<int32_t> size_;
   char buf[4];
};
class PidLocker
{
public:
   typedef int locker_t;
   enum { eLockerBits = sizeof(locker_t) * 8 };
   static locker_t this_locker()
   {
      static locker_t val = getpid();
      return val;
   }
   static bool is_alive(locker_t locker) { return true; }
};
class RobustPidLocker
{
public:
   typedef int locker_t;
   enum { eLockerBits = sizeof(locker_t) * 8 };
   static locker_t this_locker()
   {
      static locker_t val = getpid();
      return val;
   }
   static bool is_alive(locker_t locker)
   {
      char buf[64] = {0};
      snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker);
      return access(buf, F_OK) == 0;
   }
};
class ExpiredLocker
{
public:
   typedef int64_t locker_t;
   enum { eLockerBits = 63 };
   static locker_t this_locker() { return Now(); }
   static bool is_alive(locker_t locker)
   {
      return Now() < locker + steady_clock::duration(10s).count();
   }
private:
   static locker_t Now() { return steady_clock::now().time_since_epoch().count(); }
};
template <class LockerT>
class CasMutex
{
   typedef typename LockerT::locker_t locker_t;
   static inline locker_t this_locker() { return LockerT::this_locker(); }
   static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
   static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits);
   static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!");
   typedef uint64_t locker_t;
   static inline locker_t this_locker() { return pthread_self(); }
   static const uint64_t kLockerMask = MaskBits(63);
public:
   CasMutex() :
       meta_(0) {}
   int try_lock()
   {
      const auto t = steady_clock::now().time_since_epoch().count();
      auto old = meta_.load();
      int r = 0;
      if (!Locked(old)) {
         r = MetaCas(old, Meta(1, this_locker()));
      } else if (!is_alive(Locker(old))) {
         r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
         if (r) {
            printf("captured locker %ld -> %ld, locker = %d\n", int64_t(Locker(old)), int64_t(this_locker()), r);
         }
      }
      return r;
   }
@@ -200,7 +83,87 @@
   bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
};
typedef CasMutex<RobustPidLocker> Mutex;
class NullMutex
{
public:
   template <class... T>
   explicit NullMutex(T &&...t) {} // easy test.
   bool try_lock() { return true; }
   void lock() {}
   void unlock() {}
};
// flock + mutex
class FMutex
{
public:
   typedef uint64_t id_t;
   FMutex(id_t id) :
       id_(id), fd_(Open(id_))
   {
      if (fd_ == -1) { throw "error create mutex!"; }
   }
   ~FMutex() { Close(fd_); }
   bool try_lock();
   void lock();
   void unlock();
private:
   static std::string GetPath(id_t id)
   {
      const std::string dir("/tmp/.bhome_mtx");
      mkdir(dir.c_str(), 0777);
      return dir + "/fm_" + std::to_string(id);
   }
   static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); }
   static int Close(int fd) { return close(fd); }
   void FLock();
   void FUnlock();
   id_t id_;
   int fd_;
   std::mutex mtx_;
};
union semun {
   int val;               /* Value for SETVAL */
   struct semid_ds *buf;  /* Buffer for IPC_STAT, IPC_SET */
   unsigned short *array; /* Array for GETALL, SETALL */
   struct seminfo *__buf; /* Buffer for IPC_INFO
                                           (Linux-specific) */
};
class SemMutex
{
public:
   SemMutex(key_t key) :
       key_(key), sem_id_(semget(key, 1, 0666))
   {
      if (sem_id_ == -1) { throw "error create semaphore."; }
   }
   ~SemMutex() {}
   bool try_lock()
   {
      sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT};
      return semop(sem_id_, &op, 1) == 0;
   }
   void lock()
   {
      sembuf op = {0, -1, SEM_UNDO};
      semop(sem_id_, &op, 1);
   }
   void unlock()
   {
      sembuf op = {0, 1, SEM_UNDO};
      semop(sem_id_, &op, 1);
   }
private:
   key_t key_;
   int sem_id_;
};
template <class Lock>
class Guard
@@ -244,7 +207,6 @@
   bool push_back(const Data d)
   {
      Guard<Mutex> guard(mutex_);
      auto old = mtail();
      auto pos = Pos(old);
      auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
@@ -256,7 +218,6 @@
   }
   bool pop_front(Data &d)
   {
      Guard<Mutex> guard(mutex_);
      auto old = mhead();
      auto pos = Pos(old);
      if (!(pos == tail())) {
@@ -280,7 +241,6 @@
   meta_type mtail() const { return mtail_.load(); }
   // data
   const size_type capacity_;
   Mutex mutex_;
   std::atomic<meta_type> mhead_;
   std::atomic<meta_type> mtail_;
   Alloc al_;
@@ -306,7 +266,7 @@
   size_type tail() const { return tail_.load(); }
   bool like_empty() const { return head() == tail() && Empty(buf[head()]); }
   bool like_full() const { return head() == tail() && !Empty(buf[head()]); }
   bool push_back(const Data d, bool try_more = false)
   bool push(const Data d, bool try_more = false)
   {
      bool r = false;
      size_type i = 0;
@@ -319,7 +279,7 @@
      } while (try_more && !r && ++i < capacity);
      return r;
   }
   bool pop_front(Data &d, bool try_more = false)
   bool pop(Data &d, bool try_more = false)
   {
      bool r = false;
      Data cur;