| | |
| | | namespace |
| | | { |
| | | static_assert(sizeof(steady_clock::duration) == sizeof(int64_t)); |
| | | static_assert(sizeof(RobustReqRep) == 24); |
| | | static_assert(sizeof(Mutex) == 8); |
| | | static_assert(sizeof(CircularBuffer<int>) == 48); |
| | | |
| | | auto Now() { return steady_clock::now().time_since_epoch(); } |
| | | const steady_clock::duration kIoTimeout = 10ms; |
| | | const steady_clock::duration kIoExpire = 100ms; |
| | | |
| | | void Yield() { std::this_thread::sleep_for(10us); } |
| | | |
| | | } // namespace |
| | | |
| | | void QuickSleep() |
| | | { |
| | | Yield(); |
| | | } |
| | | bool RobustReqRep::StateCas(State exp, State val) |
| | | { |
| | | bool r = state_.compare_exchange_strong(exp, val); |
| | | return r ? (timestamp_.store(Now()), true) : false; |
| | | } |
| | | void QuickSleep() { Yield(); } |
| | | |
| | | int RobustReqRep::ClientReadReply(Msg &reply) |
| | | bool FMutex::try_lock() |
| | | { |
| | | auto end_time = Now() + kIoTimeout; |
| | | int done = false; |
| | | do { |
| | | if (StateCas(eServerWriteEnd, eClientReadBegin)) { |
| | | Read(reply); |
| | | done = StateCas(eClientReadBegin, eClientReadEnd); |
| | | if (done) { break; } |
| | | } |
| | | Yield(); |
| | | } while (Now() < end_time); |
| | | return done ? eSuccess : eTimeout; |
| | | } |
| | | |
| | | int RobustReqRep::ClientWriteRequest(const Msg &request) |
| | | { |
| | | if (request.size() > capacity_) { |
| | | return eSizeError; |
| | | } |
| | | auto end_time = Now() + kIoTimeout; |
| | | bool done = false; |
| | | do { |
| | | if (StateCas(eStateReady, eClientWriteBegin)) { |
| | | Write(request); |
| | | done = StateCas(eClientWriteBegin, eClientWriteEnd); |
| | | if (done) { break; } |
| | | } |
| | | Yield(); |
| | | } while (Now() < end_time); |
| | | return done ? eSuccess : eTimeout; |
| | | } |
| | | |
| | | int RobustReqRep::ServerReadRequest(Msg &request) |
| | | { |
| | | bool done = false; |
| | | if (StateCas(eClientWriteEnd, eServerReadBegin)) { |
| | | Read(request); |
| | | done = StateCas(eServerReadBegin, eServerReadEnd); |
| | | } else { |
| | | auto old = state_.load(); |
| | | if (old != eStateReady && timestamp_.load() + kIoExpire < Now()) { |
| | | StateCas(old, eStateReady); |
| | | if (flock(fd_, LOCK_EX | LOCK_NB) == 0) { |
| | | if (mtx_.try_lock()) { |
| | | return true; |
| | | } else { |
| | | flock(fd_, LOCK_UN); |
| | | } |
| | | } |
| | | return done ? eSuccess : eTimeout; |
| | | return false; |
| | | } |
| | | |
| | | int RobustReqRep::ServerWriteReply(const Msg &reply) |
| | | void FMutex::lock() |
| | | { |
| | | if (reply.size() > capacity_) { |
| | | return eSizeError; |
| | | } |
| | | // no need to loop write, either success or timeout. |
| | | bool done = false; |
| | | if (StateCas(eServerReadEnd, eServerWriteBegin)) { |
| | | Write(reply); |
| | | done = StateCas(eServerWriteBegin, eServerWriteEnd); |
| | | } |
| | | return done ? eSuccess : eTimeout; |
| | | //Note: the lock order affects performance a lot, |
| | | // locking fd_ first is about 100 times faster than locking mtx_ first. |
| | | flock(fd_, LOCK_EX); |
| | | mtx_.lock(); |
| | | } |
| | | |
| | | void FMutex::unlock() |
| | | { |
| | | mtx_.unlock(); |
| | | flock(fd_, LOCK_UN); |
| | | } |
| | | } // namespace robust |