lichao
2021-04-28 a6f67b4249525089fb97eb9418c7014f66c2a000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include "robust.h"
#include "util.h"
 
using namespace robust;
 
typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb;
Rcb *GetRCBImpl(SharedMemory &shm, const int nelem)
{
    int cap = nelem + 1;
    typedef uint64_t Data;
    auto size = sizeof(Rcb) + sizeof(Data) * cap;
    void *p = shm.Alloc(size);
    if (p) {
        return new (p) Rcb(cap, shm.get_segment_manager());
    }
    return nullptr;
}
Rcb *GetRCB(SharedMemory &shm, const int nelem)
{
    void **pStore = shm.FindOrCreate<void *>("test_rcb_pointer", nullptr);
    if (pStore) {
        if (!*pStore) {
            *pStore = GetRCBImpl(shm, nelem);
        }
        return (Rcb *) *pStore;
    }
    return nullptr;
}
 
void MySleep()
{
    std::this_thread::sleep_for(2us);
}
 
BOOST_AUTO_TEST_CASE(RobustTest)
{
    SharedMemory &shm = TestShm();
    shm.Remove();
    pid_t pid = getpid();
    printf("pid : %d\n", pid);
    auto Access = [](pid_t pid) {
        char buf[100] = {0};
        sprintf(buf, "/proc/%d/stat", pid);
        int r = access(buf, F_OK);
        printf("access %d\n", r);
    };
    Access(pid);
    Access(pid + 1);
    // Sleep(10s);
    // return;
 
    int nelement = 640;
    auto rcb = GetRCB(shm, nelement);
    BOOST_CHECK(rcb != nullptr);
    BOOST_CHECK(rcb->empty());
    BOOST_CHECK(rcb->push_back(1));
    BOOST_CHECK(rcb->size() == 1);
    int64_t d;
    BOOST_CHECK(rcb->pop_front(d));
    BOOST_CHECK(rcb->empty());
 
    const uint64_t nmsg = 1000 * 1000 * 1;
    uint64_t correct_total = nmsg * (nmsg - 1) / 2;
    std::atomic<uint64_t> total(0);
    std::atomic<uint64_t> nwrite(0);
    std::atomic<uint64_t> writedone(0);
    auto Writer = [&]() {
        uint64_t n = 0;
        while ((n = nwrite++) < nmsg) {
            while (!rcb->push_back(n)) {
                // MySleep();
            }
            ++writedone;
        }
    };
    std::atomic<uint64_t> nread(0);
    auto Reader = [&]() {
        while (nread.load() < nmsg) {
            int64_t d;
            if (rcb->pop_front(d)) {
                ++nread;
                total += d;
            } else {
                MySleep();
            }
        }
    };
 
    auto status = [&]() {
        auto next = steady_clock::now();
        uint32_t lw = 0;
        uint32_t lr = 0;
        do {
            std::this_thread::sleep_until(next);
            next += 1s;
            auto w = writedone.load();
            auto r = nread.load();
            printf("write: %6ld, spd: %6ld,  read: %6ld, spd: %6ld , queue size: %d\n", w, w - lw, r, r - lr, rcb->size());
            lw = w;
            lr = r;
        } while (nread.load() < nmsg);
    };
 
    ThreadManager threads;
    boost::timer::auto_cpu_timer timer;
    printf("Testing Robust Buffer, msgs %ld, queue size: %d \n", nmsg, nelement);
    threads.Launch(status);
    for (int i = 0; i < 10; ++i) {
        threads.Launch(Reader);
        threads.Launch(Writer);
    }
    threads.WaitAll();
    printf("total: %ld, expected: %ld\n", total.load(), correct_total);
    BOOST_CHECK_EQUAL(total.load(), correct_total);
}