lichao
2021-06-02 993c556000a414011626770540678948f16eaa9e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/*
 * =====================================================================================
 *
 *       Filename:  robust.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年04月27日 10时04分29秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:  
 *
 * =====================================================================================
 */
 
#ifndef ROBUST_Q31RCWYU
#define ROBUST_Q31RCWYU
 
#include "bh_util.h"
#include "log.h"
#include <atomic>
#include <chrono>
#include <unistd.h>
 
namespace robust
{
 
// atomic queue, length is 1.
// lowest bit is used for data flag, 63 bit for data.
class AtomicQ63
{
public:
    typedef int64_t Data;
    AtomicQ63() { memset(this, 0, sizeof(*this)); }
    bool push(const Data d, bool try_more = false)
    {
        auto cur = buf.load();
        return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d));
    }
    bool pop(Data &d, bool try_more = false)
    {
        Data cur = buf.load();
        bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0);
        if (r) { d = Dec(cur); }
        return r;
    }
 
private:
    static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
    static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
    static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
 
    typedef std::atomic<Data> AData;
    static_assert(sizeof(Data) == sizeof(AData));
 
    AData buf;
};
 
// atomic request-reply process, one cycle a time.
class AtomicReqRep
{
public:
    typedef int64_t Data;
    typedef std::function<Data(const Data)> Handler;
    bool ClientRequest(const Data request, Data &reply);
    bool ServerProcess(Handler onReq);
    AtomicReqRep() :
        data_(0), timestamp_(now()) {}
 
private:
    enum State {
        eStateFree,
        eStateRequest,
        eStateReply
    };
    static int GetState(Data d) { return d & MaskBits(3); }
    static Data Encode(Data d, State st) { return (d << 3) | st; }
    static Data Decode(Data d) { return d >> 3; }
    typedef std::chrono::steady_clock steady_clock;
    typedef steady_clock::duration Duration;
    static Duration now() { return steady_clock::now().time_since_epoch(); }
 
    bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
    std::atomic<Data> data_;
    std::atomic<Duration> timestamp_;
};
 
} // namespace robust
#endif // end of include guard: ROBUST_Q31RCWYU