lichao
2021-05-19 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3
src/robust.cpp
@@ -25,87 +25,40 @@
namespace
{
static_assert(sizeof(steady_clock::duration) == sizeof(int64_t));
static_assert(sizeof(RobustReqRep) == 24);
static_assert(sizeof(Mutex) == 8);
static_assert(sizeof(CircularBuffer<int>) == 48);
auto Now() { return steady_clock::now().time_since_epoch(); }
const steady_clock::duration kIoTimeout = 10ms;
const steady_clock::duration kIoExpire = 100ms;
void Yield() { std::this_thread::sleep_for(10us); }
} // namespace
void QuickSleep()
{
   Yield();
}
bool RobustReqRep::StateCas(State exp, State val)
{
   bool r = state_.compare_exchange_strong(exp, val);
   return r ? (timestamp_.store(Now()), true) : false;
}
void QuickSleep() { Yield(); }
int RobustReqRep::ClientReadReply(Msg &reply)
bool FMutex::try_lock()
{
   auto end_time = Now() + kIoTimeout;
   int done = false;
   do {
      if (StateCas(eServerWriteEnd, eClientReadBegin)) {
         Read(reply);
         done = StateCas(eClientReadBegin, eClientReadEnd);
         if (done) { break; }
      }
      Yield();
   } while (Now() < end_time);
   return done ? eSuccess : eTimeout;
}
int RobustReqRep::ClientWriteRequest(const Msg &request)
{
   if (request.size() > capacity_) {
      return eSizeError;
   }
   auto end_time = Now() + kIoTimeout;
   bool done = false;
   do {
      if (StateCas(eStateReady, eClientWriteBegin)) {
         Write(request);
         done = StateCas(eClientWriteBegin, eClientWriteEnd);
         if (done) { break; }
      }
      Yield();
   } while (Now() < end_time);
   return done ? eSuccess : eTimeout;
}
int RobustReqRep::ServerReadRequest(Msg &request)
{
   bool done = false;
   if (StateCas(eClientWriteEnd, eServerReadBegin)) {
      Read(request);
      done = StateCas(eServerReadBegin, eServerReadEnd);
   } else {
      auto old = state_.load();
      if (old != eStateReady && timestamp_.load() + kIoExpire < Now()) {
         StateCas(old, eStateReady);
   if (flock(fd_, LOCK_EX | LOCK_NB) == 0) {
      ++count_;
      if (mtx_.try_lock()) {
         return true;
      } else {
         if (--count_ == 0) {
            flock(fd_, LOCK_UN);
         }
      }
   }
   return done ? eSuccess : eTimeout;
   return false;
}
int RobustReqRep::ServerWriteReply(const Msg &reply)
void FMutex::lock()
{
   if (reply.size() > capacity_) {
      return eSizeError;
   flock(fd_, LOCK_EX);
   ++count_;
   mtx_.lock();
}
void FMutex::unlock()
{
   mtx_.unlock();
   if (--count_ == 0) {
      flock(fd_, LOCK_UN);
   }
   // no need to loop write, either success or timeout.
   bool done = false;
   if (StateCas(eServerReadEnd, eServerWriteBegin)) {
      Write(reply);
      done = StateCas(eServerWriteBegin, eServerWriteEnd);
   }
   return done ? eSuccess : eTimeout;
}
} // namespace robust