lichao
2021-05-08 28f06bc49a4d8d69f1ea2f767863b7921d12f155
src/robust.h
@@ -23,8 +23,12 @@
#include <atomic>
#include <chrono>
#include <memory>
#include <string.h>
#include <mutex>
#include <string>
#include <sys/file.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
@@ -37,143 +41,21 @@
void QuickSleep();
class RobustReqRep
{
   typedef uint32_t State;
   typedef std::string Msg;
   typedef std::chrono::steady_clock::duration Duration;
public:
   enum ErrorCode {
      eSuccess = 0,
      eTimeout = EAGAIN,
      eSizeError = EINVAL,
   };
   explicit RobustReqRep(const uint32_t max_len) :
       capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {}
   void PutReady() { state_.store(eStateReady); }
   bool Ready() const { return state_.load() == eStateReady; }
   uint32_t capacity() const { return capacity_; }
   int ClientRequest(const Msg &request, Msg &reply)
   {
      int r = ClientWriteRequest(request);
      if (r == eSuccess) {
         r = ClientReadReply(reply);
      }
      return r;
   }
   int ClientReadReply(Msg &reply);
   int ClientWriteRequest(const Msg &request);
   int ServerReadRequest(Msg &request);
   int ServerWriteReply(const Msg &reply);
private:
   RobustReqRep(const RobustReqRep &);
   RobustReqRep(RobustReqRep &&);
   RobustReqRep &operator=(const RobustReqRep &) = delete;
   RobustReqRep &operator=(RobustReqRep &&) = delete;
   enum {
      eStateInit = 0,
      eStateReady = 0x19833891,
      eClientWriteBegin,
      eClientWriteEnd,
      eServerReadBegin,
      eServerReadEnd,
      eServerWriteBegin,
      eServerWriteEnd,
      eClientReadBegin,
      eClientReadEnd = eStateReady,
   };
   bool StateCas(State exp, State val);
   void Write(const Msg &msg)
   {
      size_.store(msg.size());
      memcpy(buf, msg.data(), msg.size());
   }
   void Read(Msg &msg) { msg.assign(buf, size_.load()); }
   const uint32_t capacity_;
   std::atomic<State> state_;
   static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data.");
   std::atomic<Duration> timestamp_;
   std::atomic<int32_t> size_;
   char buf[4];
};
class PidLocker
{
public:
   typedef int locker_t;
   enum { eLockerBits = sizeof(locker_t) * 8 };
   static locker_t this_locker()
   {
      static locker_t val = getpid();
      return val;
   }
   static bool is_alive(locker_t locker) { return true; }
};
class RobustPidLocker
{
public:
   typedef int locker_t;
   enum { eLockerBits = sizeof(locker_t) * 8 };
   static locker_t this_locker()
   {
      static locker_t val = getpid();
      return val;
   }
   static bool is_alive(locker_t locker)
   {
      char buf[64] = {0};
      snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker);
      return access(buf, F_OK) == 0;
   }
};
class ExpiredLocker
{
public:
   typedef int64_t locker_t;
   enum { eLockerBits = 63 };
   static locker_t this_locker() { return Now(); }
   static bool is_alive(locker_t locker)
   {
      return Now() < locker + steady_clock::duration(10s).count();
   }
private:
   static locker_t Now() { return steady_clock::now().time_since_epoch().count(); }
};
template <class LockerT>
class CasMutex
{
   typedef typename LockerT::locker_t locker_t;
   static inline locker_t this_locker() { return LockerT::this_locker(); }
   static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
   static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits);
   static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!");
   typedef uint64_t locker_t;
   static inline locker_t this_locker() { return pthread_self(); }
   static const uint64_t kLockerMask = MaskBits(63);
public:
   CasMutex() :
       meta_(0) {}
   int try_lock()
   {
      const auto t = steady_clock::now().time_since_epoch().count();
      auto old = meta_.load();
      int r = 0;
      if (!Locked(old)) {
         r = MetaCas(old, Meta(1, this_locker()));
      } else if (!is_alive(Locker(old))) {
         r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
         if (r) {
            LOG_DEBUG() << "captured locker " << int64_t(Locker(old)) << " -> " << int64_t(this_locker()) << ", locker = " << r;
         }
      }
      return r;
   }
@@ -201,7 +83,89 @@
   bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
};
typedef CasMutex<RobustPidLocker> Mutex;
class NullMutex
{
public:
   bool try_lock() { return true; }
   void lock() {}
   void unlock() {}
};
// flock + mutex
class FMutex
{
public:
   typedef uint64_t id_t;
   FMutex(id_t id) :
       id_(id), fd_(Open(id_))
   {
      if (fd_ == -1) { throw "error create mutex!"; }
   }
   ~FMutex() { Close(fd_); }
   bool try_lock();
   void lock();
   void unlock();
private:
   static std::string GetPath(id_t id)
   {
      const std::string dir("/tmp/.bhome_mtx");
      mkdir(dir.c_str(), 0777);
      return dir + "/fm_" + std::to_string(id);
   }
   static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDWR, 0666); }
   static int Close(int fd) { return close(fd); }
   id_t id_;
   int fd_;
   std::mutex mtx_;
};
union semun {
   int val;               /* Value for SETVAL */
   struct semid_ds *buf;  /* Buffer for IPC_STAT, IPC_SET */
   unsigned short *array; /* Array for GETALL, SETALL */
   struct seminfo *__buf; /* Buffer for IPC_INFO
                                           (Linux-specific) */
};
class SemMutex
{
public:
   SemMutex(key_t key) :
       key_(key), sem_id_(semget(key, 1, 0666 | IPC_CREAT))
   {
      if (sem_id_ == -1) { throw "error create semaphore."; }
      union semun init_val;
      init_val.val = 1;
      semctl(sem_id_, 0, SETVAL, init_val);
   }
   ~SemMutex()
   {
      // semctl(sem_id_, 0, IPC_RMID, semun{});
   }
   bool try_lock()
   {
      sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT};
      return semop(sem_id_, &op, 1) == 0;
   }
   void lock()
   {
      sembuf op = {0, -1, SEM_UNDO};
      semop(sem_id_, &op, 1);
   }
   void unlock()
   {
      sembuf op = {0, 1, SEM_UNDO};
      semop(sem_id_, &op, 1);
   }
private:
   key_t key_;
   int sem_id_;
};
template <class Lock>
class Guard
@@ -245,7 +209,6 @@
   bool push_back(const Data d)
   {
      Guard<Mutex> guard(mutex_);
      auto old = mtail();
      auto pos = Pos(old);
      auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
@@ -257,7 +220,6 @@
   }
   bool pop_front(Data &d)
   {
      Guard<Mutex> guard(mutex_);
      auto old = mhead();
      auto pos = Pos(old);
      if (!(pos == tail())) {
@@ -281,7 +243,6 @@
   meta_type mtail() const { return mtail_.load(); }
   // data
   const size_type capacity_;
   Mutex mutex_;
   std::atomic<meta_type> mhead_;
   std::atomic<meta_type> mtail_;
   Alloc al_;