lichao
2021-04-29 72bffb0807925a156b076b71f78c848a08d27b87
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/*
 * =====================================================================================
 *
 *       Filename:  robust.cpp
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年04月27日 10时04分19秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:  
 *
 * =====================================================================================
 */
#include "robust.h"
#include <chrono>
#include <thread>
 
namespace robust
{
 
namespace
{
static_assert(sizeof(steady_clock::duration) == sizeof(int64_t));
static_assert(sizeof(RobustReqRep) == 24);
static_assert(sizeof(Mutex) == 8);
static_assert(sizeof(CircularBuffer<int>) == 48);
 
auto Now() { return steady_clock::now().time_since_epoch(); }
const steady_clock::duration kIoTimeout = 10ms;
const steady_clock::duration kIoExpire = 100ms;
 
void Yield() { std::this_thread::sleep_for(10us); }
} // namespace
 
void QuickSleep()
{
    Yield();
}
bool RobustReqRep::StateCas(State exp, State val)
{
    bool r = state_.compare_exchange_strong(exp, val);
    return r ? (timestamp_.store(Now()), true) : false;
}
 
int RobustReqRep::ClientReadReply(Msg &reply)
{
    auto end_time = Now() + kIoTimeout;
    int done = false;
    do {
        if (StateCas(eServerWriteEnd, eClientReadBegin)) {
            Read(reply);
            done = StateCas(eClientReadBegin, eClientReadEnd);
            if (done) { break; }
        }
        Yield();
    } while (Now() < end_time);
    return done ? eSuccess : eTimeout;
}
 
int RobustReqRep::ClientWriteRequest(const Msg &request)
{
    if (request.size() > capacity_) {
        return eSizeError;
    }
    auto end_time = Now() + kIoTimeout;
    bool done = false;
    do {
        if (StateCas(eStateReady, eClientWriteBegin)) {
            Write(request);
            done = StateCas(eClientWriteBegin, eClientWriteEnd);
            if (done) { break; }
        }
        Yield();
    } while (Now() < end_time);
    return done ? eSuccess : eTimeout;
}
 
int RobustReqRep::ServerReadRequest(Msg &request)
{
    bool done = false;
    if (StateCas(eClientWriteEnd, eServerReadBegin)) {
        Read(request);
        done = StateCas(eServerReadBegin, eServerReadEnd);
    } else {
        auto old = state_.load();
        if (old != eStateReady && timestamp_.load() + kIoExpire < Now()) {
            StateCas(old, eStateReady);
        }
    }
    return done ? eSuccess : eTimeout;
}
 
int RobustReqRep::ServerWriteReply(const Msg &reply)
{
    if (reply.size() > capacity_) {
        return eSizeError;
    }
    // no need to loop write, either success or timeout.
    bool done = false;
    if (StateCas(eServerReadEnd, eServerWriteBegin)) {
        Write(reply);
        done = StateCas(eServerWriteBegin, eServerWriteEnd);
    }
    return done ? eSuccess : eTimeout;
}
 
} // namespace robust