| | |
| | | * ===================================================================================== |
| | | */ |
| | | #include "robust.h" |
| | | #include <chrono> |
| | | #include <thread> |
| | | |
| | | using namespace std::chrono; |
| | | using namespace std::chrono_literals; |
| | | |
| | | namespace |
| | | { |
| | | void yield() { std::this_thread::sleep_for(10us); } |
| | | } // namespace |
| | | |
| | | namespace robust |
| | | { |
| | | |
| | | namespace |
| | | bool AtomicReqRep::ClientRequest(const Data request, Data &reply) |
| | | { |
| | | static_assert(sizeof(steady_clock::duration) == sizeof(int64_t)); |
| | | static_assert(sizeof(RobustReqRep) == 24); |
| | | static_assert(sizeof(CasMutex<false>) == 8); |
| | | static_assert(sizeof(CircularBuffer<int>) == 48); |
| | | |
| | | auto Now() { return steady_clock::now().time_since_epoch(); } |
| | | const steady_clock::duration kIoTimeout = 10ms; |
| | | const steady_clock::duration kIoExpire = 100ms; |
| | | |
| | | void Yield() { std::this_thread::sleep_for(10us); } |
| | | } // namespace |
| | | |
| | | void QuickSleep() |
| | | { |
| | | Yield(); |
| | | } |
| | | bool RobustReqRep::StateCas(State exp, State val) |
| | | { |
| | | bool r = state_.compare_exchange_strong(exp, val); |
| | | return r ? (timestamp_.store(Now()), true) : false; |
| | | } |
| | | |
| | | int RobustReqRep::ClientReadReply(Msg &reply) |
| | | { |
| | | auto end_time = Now() + kIoTimeout; |
| | | int done = false; |
| | | auto end_time = now() + 3s; |
| | | do { |
| | | if (StateCas(eServerWriteEnd, eClientReadBegin)) { |
| | | Read(reply); |
| | | done = StateCas(eClientReadBegin, eClientReadEnd); |
| | | if (done) { break; } |
| | | Data cur = data_.load(); |
| | | if (GetState(cur) == eStateFree && |
| | | DataCas(cur, Encode(request, eStateRequest))) { |
| | | do { |
| | | yield(); |
| | | cur = data_.load(); |
| | | if (GetState(cur) == eStateReply) { |
| | | DataCas(cur, Encode(0, eStateFree)); |
| | | reply = Decode(cur); |
| | | return true; |
| | | } |
| | | } while (now() < end_time); |
| | | } |
| | | Yield(); |
| | | } while (Now() < end_time); |
| | | return done ? eSuccess : eTimeout; |
| | | yield(); |
| | | } while (now() < end_time); |
| | | return false; |
| | | } |
| | | |
| | | int RobustReqRep::ClientWriteRequest(const Msg &request) |
| | | bool AtomicReqRep::ServerProcess(Handler onReq) |
| | | { |
| | | if (request.size() > capacity_) { |
| | | return eSizeError; |
| | | } |
| | | auto end_time = Now() + kIoTimeout; |
| | | bool done = false; |
| | | do { |
| | | if (StateCas(eStateReady, eClientWriteBegin)) { |
| | | Write(request); |
| | | done = StateCas(eClientWriteBegin, eClientWriteEnd); |
| | | if (done) { break; } |
| | | Data cur = data_.load(); |
| | | switch (GetState(cur)) { |
| | | case eStateRequest: |
| | | if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) { |
| | | timestamp_ = now(); |
| | | return true; |
| | | } |
| | | Yield(); |
| | | } while (Now() < end_time); |
| | | return done ? eSuccess : eTimeout; |
| | | } |
| | | |
| | | int RobustReqRep::ServerReadRequest(Msg &request) |
| | | { |
| | | bool done = false; |
| | | if (StateCas(eClientWriteEnd, eServerReadBegin)) { |
| | | Read(request); |
| | | done = StateCas(eServerReadBegin, eServerReadEnd); |
| | | } else { |
| | | auto old = state_.load(); |
| | | if (old != eStateReady && timestamp_.load() + kIoExpire < Now()) { |
| | | StateCas(old, eStateReady); |
| | | break; |
| | | case eStateReply: |
| | | if (timestamp_.load() + 3s < now()) { |
| | | DataCas(cur, Encode(0, eStateFree)); |
| | | } |
| | | break; |
| | | case eStateFree: |
| | | default: break; |
| | | } |
| | | return done ? eSuccess : eTimeout; |
| | | return false; |
| | | } |
| | | |
| | | int RobustReqRep::ServerWriteReply(const Msg &reply) |
| | | { |
| | | if (reply.size() > capacity_) { |
| | | return eSizeError; |
| | | } |
| | | // no need to loop write, either success or timeout. |
| | | bool done = false; |
| | | if (StateCas(eServerReadEnd, eServerWriteBegin)) { |
| | | Write(reply); |
| | | done = StateCas(eServerWriteBegin, eServerWriteEnd); |
| | | } |
| | | return done ? eSuccess : eTimeout; |
| | | } |
| | | |
| | | } // namespace robust |