lichao
2021-06-01 43d4e95770b0519341153202c9a535aaa8e164c5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/*
 * =====================================================================================
 *
 *       Filename:  robust.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年04月27日 10时04分29秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:  
 *
 * =====================================================================================
 */
 
#ifndef ROBUST_Q31RCWYU
#define ROBUST_Q31RCWYU
 
#include "bh_util.h"
#include "log.h"
#include <atomic>
#include <chrono>
#include <unistd.h>
 
namespace robust
{
/*
template <unsigned PowerSize = 4, class Int = int64_t>
class AtomicQueue
{
public:
    typedef uint32_t size_type;
    typedef Int Data;
    typedef std::atomic<Data> AData;
    static_assert(sizeof(Data) == sizeof(AData));
    enum {
        power = PowerSize,
        capacity = (1 << power),
        mask = capacity - 1,
    };
 
    AtomicQueue() { memset(this, 0, sizeof(*this)); }
    size_type head() const { return head_.load(); }
    size_type tail() const { return tail_.load(); }
    bool push(const Data d, bool try_more = false)
    {
        bool r = false;
        size_type i = 0;
        do {
            auto pos = tail();
            if (tail_.compare_exchange_strong(pos, Next(pos))) {
                auto cur = buf[pos].load();
                r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d));
            }
        } while (try_more && !r && ++i < capacity);
        return r;
    }
    bool pop(Data &d, bool try_more = false)
    {
        bool r = false;
        Data cur;
        size_type i = 0;
        do {
            auto pos = head();
            if (head_.compare_exchange_strong(pos, Next(pos))) {
                cur = buf[pos].load();
                r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0);
            }
        } while (try_more && !r && ++i < capacity);
        if (r) { d = Dec(cur); }
        return r;
    }
 
private:
    static_assert(std::is_integral<Data>::value, "Data must be integral type!");
    static_assert(std::is_signed<Data>::value, "Data must be signed type!");
    static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!");
 
    static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
    static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
    static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
    static size_type Next(const size_type index) { return (index + 1) & mask; }
 
    std::atomic<size_type> head_;
    std::atomic<size_type> tail_;
    AData buf[capacity];
};
//*/
 
class AtomicQ63
{
public:
    typedef int64_t Data;
    AtomicQ63() { memset(this, 0, sizeof(*this)); }
    bool push(const Data d, bool try_more = false)
    {
        auto cur = buf.load();
        return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d));
    }
    bool pop(Data &d, bool try_more = false)
    {
        Data cur = buf.load();
        bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0);
        if (r) { d = Dec(cur); }
        return r;
    }
 
private:
    static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
    static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
    static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
 
    typedef std::atomic<Data> AData;
    static_assert(sizeof(Data) == sizeof(AData));
 
    AData buf;
};
 
class AtomicReqRep
{
public:
    typedef int64_t Data;
    typedef std::function<Data(const Data)> Handler;
    bool ClientRequest(const Data request, Data &reply);
    bool ServerProcess(Handler onReq);
 
private:
    enum State {
        eStateFree,
        eStateRequest,
        eStateReply
    };
    static int GetState(Data d) { return d & MaskBits(3); }
    static Data Encode(Data d, State st) { return (d << 3) | st; }
    static Data Decode(Data d) { return d >> 3; }
    typedef std::chrono::steady_clock steady_clock;
    typedef steady_clock::duration Duration;
    Duration now() { return steady_clock::now().time_since_epoch(); }
 
    bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
    std::atomic<Data> data_;
    std::atomic<Duration> timestamp_;
};
 
} // namespace robust
#endif // end of include guard: ROBUST_Q31RCWYU