#include "robust.h"
|
#include "util.h"
|
|
using namespace robust;
|
|
enum {
|
eLockerBits = 32,
|
eLockerMask = MaskBits(sizeof(int) * 8),
|
};
|
|
typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb;
|
Rcb *GetRCB(SharedMemory &shm, const int nelem)
|
{
|
int cap = nelem + 1;
|
typedef uint64_t Data;
|
auto size = sizeof(Rcb) + sizeof(Data) * cap;
|
void *p = shm.Alloc(size);
|
if (p) {
|
return new (p) Rcb(cap, shm.get_segment_manager());
|
}
|
return nullptr;
|
}
|
|
void MySleep()
|
{
|
std::this_thread::sleep_for(2us);
|
}
|
|
BOOST_AUTO_TEST_CASE(QueueTest)
|
{
|
SharedMemory &shm = TestShm();
|
shm.Remove();
|
pid_t pid = getpid();
|
printf("pid : %d\n", pid);
|
auto Access = [](pid_t pid) {
|
char buf[100] = {0};
|
sprintf(buf, "/proc/%d/stat", pid);
|
int r = access(buf, F_OK);
|
printf("access %d\n", r);
|
};
|
Access(pid);
|
Access(pid + 1);
|
// Sleep(10s);
|
// return;
|
|
int nelement = 640;
|
auto rcb = GetRCB(shm, nelement);
|
BOOST_CHECK(rcb != nullptr);
|
BOOST_CHECK(rcb->empty());
|
BOOST_CHECK(rcb->push_back(1));
|
BOOST_CHECK(rcb->size() == 1);
|
int64_t d;
|
BOOST_CHECK(rcb->pop_front(d));
|
BOOST_CHECK(rcb->empty());
|
|
const uint64_t nmsg = 1000 * 1000 * 1;
|
uint64_t correct_total = nmsg * (nmsg - 1) / 2;
|
std::atomic<uint64_t> total(0);
|
std::atomic<uint64_t> nwrite(0);
|
std::atomic<uint64_t> writedone(0);
|
auto Writer = [&]() {
|
uint64_t n = 0;
|
while ((n = nwrite++) < nmsg) {
|
while (!rcb->push_back(n)) {
|
// MySleep();
|
}
|
++writedone;
|
}
|
};
|
std::atomic<uint64_t> nread(0);
|
auto Reader = [&]() {
|
while (nread.load() < nmsg) {
|
int64_t d;
|
if (rcb->pop_front(d)) {
|
++nread;
|
total += d;
|
} else {
|
MySleep();
|
}
|
}
|
};
|
|
auto status = [&]() {
|
auto next = steady_clock::now();
|
uint32_t lw = 0;
|
uint32_t lr = 0;
|
do {
|
std::this_thread::sleep_until(next);
|
next += 1s;
|
auto w = writedone.load();
|
auto r = nread.load();
|
printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld , queue size: %d\n", w, w - lw, r, r - lr, rcb->size());
|
lw = w;
|
lr = r;
|
} while (nread.load() < nmsg);
|
};
|
|
ThreadManager threads;
|
boost::timer::auto_cpu_timer timer;
|
printf("Testing Robust Buffer, msgs %ld, queue size: %d \n", nmsg, nelement);
|
threads.Launch(status);
|
for (int i = 0; i < 10; ++i) {
|
threads.Launch(Reader);
|
threads.Launch(Writer);
|
}
|
threads.WaitAll();
|
printf("total: %ld, expected: %ld\n", total.load(), correct_total);
|
BOOST_CHECK_EQUAL(total.load(), correct_total);
|
}
|
|
BOOST_AUTO_TEST_CASE(MutexTest)
|
{
|
typedef robust::Mutex RobustMutex;
|
|
for (int i = 0; i < 20; ++i) {
|
int size = i;
|
int left = size & 7;
|
int rsize = size + ((8 - left) & 7);
|
printf("size: %3d, rsize: %3d\n", size, rsize);
|
}
|
SharedMemory &shm = TestShm();
|
// shm.Remove();
|
// return;
|
GlobalInit(shm);
|
|
const std::string mtx_name("test_mutex");
|
const std::string int_name("test_int");
|
auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
|
auto pi = shm.FindOrCreate<int>(int_name, 100);
|
|
std::mutex m;
|
typedef std::chrono::steady_clock Clock;
|
auto Now = []() { return Clock::now().time_since_epoch(); };
|
if (pi) {
|
auto old = *pi;
|
printf("int : %d, add1: %d\n", old, ++*pi);
|
}
|
|
{
|
boost::timer::auto_cpu_timer timer;
|
const int ntimes = 1000 * 1000;
|
printf("test lock/unlock %d times: ", ntimes);
|
RobustMutex mutex;
|
auto Lock = [&]() {
|
for (int i = 0; i < ntimes; ++i) {
|
mutex.lock();
|
mutex.unlock();
|
}
|
};
|
std::thread t1(Lock), t2(Lock);
|
t1.join();
|
t2.join();
|
}
|
|
auto MSFromNow = [](const int ms) {
|
using namespace boost::posix_time;
|
ptime cur = boost::posix_time::microsec_clock::universal_time();
|
return cur + millisec(ms);
|
};
|
|
auto TryLock = [&]() {
|
if (mtx->try_lock()) {
|
printf("try_lock ok\n");
|
return true;
|
} else {
|
printf("try_lock failed\n");
|
return false;
|
}
|
};
|
auto Unlock = [&]() {
|
mtx->unlock();
|
printf("unlocked\n");
|
};
|
|
if (mtx) {
|
printf("mtx exists\n");
|
if (TryLock()) {
|
auto op = [&]() {
|
if (TryLock()) {
|
Unlock();
|
}
|
};
|
op();
|
std::thread t(op);
|
t.join();
|
// Unlock();
|
} else {
|
// mtx->unlock();
|
}
|
} else {
|
printf("mtx not exists\n");
|
}
|
}
|