#include "robust.h"
|
#include "util.h"
|
|
using namespace robust;
|
|
enum {
|
eLockerBits = 32,
|
eLockerMask = MaskBits(sizeof(int) * 8),
|
};
|
|
void MySleep()
|
{
|
std::this_thread::sleep_for(2us);
|
}
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
BOOST_AUTO_TEST_CASE(QueueTest)
|
{
|
const int nthread = 100;
|
const uint64_t nmsg = 1000 * 1000 * 100;
|
|
SharedMemory &shm = TestShm();
|
shm.Remove();
|
// return; /////////////////////////////////////////////////
|
int64_t i64 = 0;
|
char c = 0;
|
for (int i = 0; i < 256; ++i) {
|
c = i;
|
i64 = int64_t(c) << 1;
|
BOOST_CHECK_EQUAL(c, (i64 >> 1));
|
uint64_t u64 = i;
|
BOOST_CHECK_EQUAL((u64 & 255), i);
|
}
|
|
#if 1
|
typedef AtomicQueue<4> Rcb;
|
|
Rcb tmp;
|
BOOST_CHECK(tmp.like_empty());
|
BOOST_CHECK(tmp.push_back(1));
|
BOOST_CHECK(tmp.tail() == 1);
|
BOOST_CHECK(tmp.head() == 0);
|
int64_t d;
|
BOOST_CHECK(tmp.pop_front(d));
|
BOOST_CHECK(tmp.like_empty());
|
BOOST_CHECK(tmp.head() == 1);
|
BOOST_CHECK(tmp.tail() == 1);
|
|
ShmObject<Rcb> rcb(shm, "test_rcb");
|
#else
|
typedef Circular<int64_t> Rcb;
|
ShmObject<Rcb> rcb(shm, "test_rcb", 64, shm.get_segment_manager());
|
#endif
|
|
const int nsize = sizeof(Rcb);
|
|
bool try_more = false;
|
uint64_t correct_total = nmsg * (nmsg - 1) / 2;
|
std::atomic<uint64_t> total(0);
|
std::atomic<uint64_t> nwrite(0);
|
std::atomic<uint64_t> writedone(0);
|
auto Writer = [&]() {
|
uint64_t n = 0;
|
while ((n = nwrite++) < nmsg) {
|
while (!rcb->push_back(n, try_more)) {
|
// MySleep();
|
}
|
++writedone;
|
}
|
};
|
std::atomic<uint64_t> nread(0);
|
auto Reader = [&]() {
|
while (nread.load() < nmsg) {
|
int64_t d;
|
if (rcb->pop_front(d, try_more)) {
|
++nread;
|
total += d;
|
} else {
|
// MySleep();
|
}
|
}
|
};
|
|
auto status = [&]() {
|
auto next = steady_clock::now();
|
uint32_t lw = 0;
|
uint32_t lr = 0;
|
do {
|
std::this_thread::sleep_until(next);
|
next += 1s;
|
auto w = writedone.load();
|
auto r = nread.load();
|
printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld\n",
|
w, w - lw, r, r - lr);
|
lw = w;
|
lr = r;
|
} while (nread.load() < nmsg);
|
};
|
|
std::thread st(status);
|
{
|
ThreadManager threads;
|
boost::timer::auto_cpu_timer timer;
|
printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread);
|
for (int i = 0; i < nthread; ++i) {
|
threads.Launch(Reader);
|
threads.Launch(Writer);
|
}
|
threads.WaitAll();
|
}
|
st.join();
|
printf("total: %ld, expected: %ld\n", total.load(), correct_total);
|
BOOST_CHECK_EQUAL(total.load(), correct_total);
|
}
|
|
BOOST_AUTO_TEST_CASE(MutexTest)
|
{
|
typedef robust::Mutex RobustMutex;
|
|
for (int i = 0; i < 20; ++i) {
|
int size = i;
|
int left = size & 7;
|
int rsize = size + ((8 - left) & 7);
|
printf("size: %3d, rsize: %3d\n", size, rsize);
|
}
|
SharedMemory &shm = TestShm();
|
// shm.Remove();
|
// return;
|
GlobalInit(shm);
|
|
const std::string mtx_name("test_mutex");
|
const std::string int_name("test_int");
|
auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
|
auto pi = shm.FindOrCreate<int>(int_name, 100);
|
|
std::mutex m;
|
typedef std::chrono::steady_clock Clock;
|
auto Now = []() { return Clock::now().time_since_epoch(); };
|
if (pi) {
|
auto old = *pi;
|
printf("int : %d, add1: %d\n", old, ++*pi);
|
}
|
|
{
|
const int ntimes = 1000 * 1000;
|
RobustMutex mutex;
|
auto Lock = [&]() {
|
for (int i = 0; i < ntimes; ++i) {
|
mutex.lock();
|
mutex.unlock();
|
}
|
};
|
|
{
|
boost::timer::auto_cpu_timer timer;
|
printf("test lock/unlock %d times: ", ntimes);
|
Lock();
|
}
|
{
|
boost::timer::auto_cpu_timer timer;
|
printf("test lock/unlock %d times, 2 thread: ", ntimes);
|
std::thread t1(Lock), t2(Lock);
|
t1.join();
|
t2.join();
|
}
|
}
|
|
auto TryLock = [&]() {
|
if (mtx->try_lock()) {
|
printf("try_lock ok\n");
|
return true;
|
} else {
|
printf("try_lock failed\n");
|
return false;
|
}
|
};
|
auto Unlock = [&]() {
|
mtx->unlock();
|
printf("unlocked\n");
|
};
|
|
if (mtx) {
|
printf("mtx exists\n");
|
if (TryLock()) {
|
auto op = [&]() {
|
if (TryLock()) {
|
Unlock();
|
}
|
};
|
op();
|
std::thread t(op);
|
t.join();
|
// Unlock();
|
} else {
|
// mtx->unlock();
|
}
|
} else {
|
printf("mtx not exists\n");
|
}
|
}
|