#include "robust.h"
|
#include "util.h"
|
#include <boost/circular_buffer.hpp>
|
|
using namespace robust;
|
|
enum {
|
eLockerBits = 32,
|
eLockerMask = MaskBits(sizeof(int) * 8),
|
};
|
|
void MySleep()
|
{
|
std::this_thread::sleep_for(2us);
|
}
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
BOOST_AUTO_TEST_CASE(InitTest)
|
{
|
AtomicReqRep rr;
|
auto client = [&]() {
|
for (int i = 0; i < 5; ++i) {
|
int64_t reply = 0;
|
bool r = rr.ClientRequest(i, reply);
|
printf("init request %d, %s, reply %d\n", i, (r ? "ok" : "failed"), reply);
|
}
|
};
|
|
bool run = true;
|
auto server = [&]() {
|
auto onReq = [](int64_t req) { return req + 100; };
|
while (run) {
|
rr.ServerProcess(onReq);
|
}
|
};
|
|
ThreadManager clients, servers;
|
servers.Launch(server);
|
for (int i = 0; i < 2; ++i) {
|
clients.Launch(client);
|
}
|
clients.WaitAll();
|
run = false;
|
servers.WaitAll();
|
}
|
|
BOOST_AUTO_TEST_CASE(QueueTest)
|
{
|
const int nthread = 100;
|
const uint64_t nmsg = 1000 * 1000 * 10;
|
|
SharedMemory &shm = TestShm();
|
shm.Remove();
|
// return; /////////////////////////////////////////////////
|
int64_t i64 = 0;
|
char c = 0;
|
for (int i = 0; i < 256; ++i) {
|
c = i;
|
i64 = int64_t(c) << 1;
|
BOOST_CHECK_EQUAL(c, (i64 >> 1));
|
uint64_t u64 = i;
|
BOOST_CHECK_EQUAL((u64 & 255), i);
|
}
|
|
uint64_t correct_total = nmsg * (nmsg - 1) / 2;
|
std::atomic<uint64_t> total(0);
|
std::atomic<uint64_t> nwrite(0);
|
std::atomic<uint64_t> writedone(0);
|
|
typedef AtomicQ63 Rcb;
|
|
Rcb tmp;
|
BOOST_CHECK(tmp.push(1));
|
int64_t d;
|
BOOST_CHECK(tmp.pop(d));
|
|
NamedShmObject<Rcb> rcb(shm, "test_rcb", eOpenOrCreate);
|
bool try_more = true;
|
|
auto Writer = [&]() {
|
uint64_t n = 0;
|
while ((n = nwrite++) < nmsg) {
|
while (!rcb->push(n, try_more)) {
|
// MySleep();
|
}
|
++writedone;
|
}
|
};
|
std::atomic<uint64_t> nread(0);
|
auto Reader = [&]() {
|
while (nread.load() < nmsg) {
|
int64_t d;
|
if (rcb->pop(d, try_more)) {
|
++nread;
|
total += d;
|
} else {
|
// MySleep();
|
}
|
}
|
};
|
|
auto status = [&]() {
|
auto next = steady_clock::now();
|
uint32_t lw = 0;
|
uint32_t lr = 0;
|
do {
|
std::this_thread::sleep_until(next);
|
next += 1s;
|
auto w = writedone.load();
|
auto r = nread.load();
|
printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld\n",
|
w, w - lw, r, r - lr);
|
lw = w;
|
lr = r;
|
} while (nread.load() < nmsg);
|
};
|
|
std::thread st(status);
|
{
|
ThreadManager threads;
|
boost::timer::auto_cpu_timer timer;
|
// printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread);
|
printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, 16, nthread);
|
for (int i = 0; i < nthread; ++i) {
|
threads.Launch(Reader);
|
threads.Launch(Writer);
|
}
|
threads.WaitAll();
|
}
|
st.join();
|
printf("total: %ld, expected: %ld\n", total.load(), correct_total);
|
BOOST_CHECK_EQUAL(total.load(), correct_total);
|
}
|