/*
|
* =====================================================================================
|
*
|
* Filename: robust.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月27日 10时04分19秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "robust.h"
|
#include <chrono>
|
#include <thread>
|
|
namespace robust
|
{
|
|
namespace
|
{
|
static_assert(sizeof(steady_clock::duration) == sizeof(int64_t));
|
static_assert(sizeof(RobustReqRep) == 24);
|
static_assert(sizeof(Mutex) == 8);
|
static_assert(sizeof(CircularBuffer<int>) == 48);
|
|
auto Now() { return steady_clock::now().time_since_epoch(); }
|
const steady_clock::duration kIoTimeout = 10ms;
|
const steady_clock::duration kIoExpire = 100ms;
|
|
void Yield() { std::this_thread::sleep_for(10us); }
|
} // namespace
|
|
void QuickSleep()
|
{
|
Yield();
|
}
|
bool RobustReqRep::StateCas(State exp, State val)
|
{
|
bool r = state_.compare_exchange_strong(exp, val);
|
return r ? (timestamp_.store(Now()), true) : false;
|
}
|
|
int RobustReqRep::ClientReadReply(Msg &reply)
|
{
|
auto end_time = Now() + kIoTimeout;
|
int done = false;
|
do {
|
if (StateCas(eServerWriteEnd, eClientReadBegin)) {
|
Read(reply);
|
done = StateCas(eClientReadBegin, eClientReadEnd);
|
if (done) { break; }
|
}
|
Yield();
|
} while (Now() < end_time);
|
return done ? eSuccess : eTimeout;
|
}
|
|
int RobustReqRep::ClientWriteRequest(const Msg &request)
|
{
|
if (request.size() > capacity_) {
|
return eSizeError;
|
}
|
auto end_time = Now() + kIoTimeout;
|
bool done = false;
|
do {
|
if (StateCas(eStateReady, eClientWriteBegin)) {
|
Write(request);
|
done = StateCas(eClientWriteBegin, eClientWriteEnd);
|
if (done) { break; }
|
}
|
Yield();
|
} while (Now() < end_time);
|
return done ? eSuccess : eTimeout;
|
}
|
|
int RobustReqRep::ServerReadRequest(Msg &request)
|
{
|
bool done = false;
|
if (StateCas(eClientWriteEnd, eServerReadBegin)) {
|
Read(request);
|
done = StateCas(eServerReadBegin, eServerReadEnd);
|
} else {
|
auto old = state_.load();
|
if (old != eStateReady && timestamp_.load() + kIoExpire < Now()) {
|
StateCas(old, eStateReady);
|
}
|
}
|
return done ? eSuccess : eTimeout;
|
}
|
|
int RobustReqRep::ServerWriteReply(const Msg &reply)
|
{
|
if (reply.size() > capacity_) {
|
return eSizeError;
|
}
|
// no need to loop write, either success or timeout.
|
bool done = false;
|
if (StateCas(eServerReadEnd, eServerWriteBegin)) {
|
Write(reply);
|
done = StateCas(eServerWriteBegin, eServerWriteEnd);
|
}
|
return done ? eSuccess : eTimeout;
|
}
|
|
} // namespace robust
|