lichao
2021-06-23 c1e39e20ca42b21eeac8b5068fa1f921bf9a070f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#include "robust.h"
#include "util.h"
#include <boost/circular_buffer.hpp>
 
using namespace robust;
 
enum {
    eLockerBits = 32,
    eLockerMask = MaskBits(sizeof(int) * 8),
};
 
void MySleep()
{
    std::this_thread::sleep_for(2us);
}
 
/////////////////////////////////////////////////////////////////////////////////////////
 
BOOST_AUTO_TEST_CASE(InitTest)
{
    AtomicReqRep rr;
    auto client = [&]() {
        for (int i = 0; i < 5; ++i) {
            int64_t reply = 0;
            bool r = rr.ClientRequest(i, reply);
            printf("init request %d, %s, reply %d\n", i, (r ? "ok" : "failed"), reply);
        }
    };
 
    bool run = true;
    auto server = [&]() {
        auto onReq = [](int64_t req) { return req + 100; };
        while (run) {
            rr.ServerProcess(onReq);
        }
    };
 
    ThreadManager clients, servers;
    servers.Launch(server);
    for (int i = 0; i < 2; ++i) {
        clients.Launch(client);
    }
    clients.WaitAll();
    run = false;
    servers.WaitAll();
}
 
BOOST_AUTO_TEST_CASE(QueueTest)
{
    const int nthread = 100;
    const uint64_t nmsg = 1000 * 1000 * 10;
 
    SharedMemory &shm = TestShm();
    shm.Remove();
    // return; /////////////////////////////////////////////////
    int64_t i64 = 0;
    char c = 0;
    for (int i = 0; i < 256; ++i) {
        c = i;
        i64 = int64_t(c) << 1;
        BOOST_CHECK_EQUAL(c, (i64 >> 1));
        uint64_t u64 = i;
        BOOST_CHECK_EQUAL((u64 & 255), i);
    }
 
    uint64_t correct_total = nmsg * (nmsg - 1) / 2;
    std::atomic<uint64_t> total(0);
    std::atomic<uint64_t> nwrite(0);
    std::atomic<uint64_t> writedone(0);
 
    typedef AtomicQ63 Rcb;
 
    Rcb tmp;
    BOOST_CHECK(tmp.push(1));
    int64_t d;
    BOOST_CHECK(tmp.pop(d));
 
    NamedShmObject<Rcb> rcb(shm, "test_rcb", eOpenOrCreate);
    bool try_more = true;
 
    auto Writer = [&]() {
        uint64_t n = 0;
        while ((n = nwrite++) < nmsg) {
            while (!rcb->push(n, try_more)) {
                // MySleep();
            }
            ++writedone;
        }
    };
    std::atomic<uint64_t> nread(0);
    auto Reader = [&]() {
        while (nread.load() < nmsg) {
            int64_t d;
            if (rcb->pop(d, try_more)) {
                ++nread;
                total += d;
            } else {
                // MySleep();
            }
        }
    };
 
    auto status = [&]() {
        auto next = steady_clock::now();
        uint32_t lw = 0;
        uint32_t lr = 0;
        do {
            std::this_thread::sleep_until(next);
            next += 1s;
            auto w = writedone.load();
            auto r = nread.load();
            printf("write: %6ld, spd: %6ld,  read: %6ld, spd: %6ld\n",
                   w, w - lw, r, r - lr);
            lw = w;
            lr = r;
        } while (nread.load() < nmsg);
    };
 
    std::thread st(status);
    {
        ThreadManager threads;
        boost::timer::auto_cpu_timer timer;
        // printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread);
        printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, 16, nthread);
        for (int i = 0; i < nthread; ++i) {
            threads.Launch(Reader);
            threads.Launch(Writer);
        }
        threads.WaitAll();
    }
    st.join();
    printf("total: %ld, expected: %ld\n", total.load(), correct_total);
    BOOST_CHECK_EQUAL(total.load(), correct_total);
}