#include "robust.h"
|
#include "util.h"
|
#include <boost/circular_buffer.hpp>
|
|
using namespace robust;
|
|
enum {
|
eLockerBits = 32,
|
eLockerMask = MaskBits(sizeof(int) * 8),
|
};
|
|
void MySleep()
|
{
|
std::this_thread::sleep_for(2us);
|
}
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
BOOST_AUTO_TEST_CASE(QueueTest)
|
{
|
const int nthread = 100;
|
const uint64_t nmsg = 1000 * 1000 * 10;
|
|
SharedMemory &shm = TestShm();
|
shm.Remove();
|
// return; /////////////////////////////////////////////////
|
int64_t i64 = 0;
|
char c = 0;
|
for (int i = 0; i < 256; ++i) {
|
c = i;
|
i64 = int64_t(c) << 1;
|
BOOST_CHECK_EQUAL(c, (i64 >> 1));
|
uint64_t u64 = i;
|
BOOST_CHECK_EQUAL((u64 & 255), i);
|
}
|
|
uint64_t correct_total = nmsg * (nmsg - 1) / 2;
|
std::atomic<uint64_t> total(0);
|
std::atomic<uint64_t> nwrite(0);
|
std::atomic<uint64_t> writedone(0);
|
|
#if 1
|
const int kPower = 0;
|
typedef AtomicQueue<kPower> Rcb;
|
|
Rcb tmp;
|
// BOOST_CHECK(tmp.like_empty());
|
BOOST_CHECK(tmp.push(1));
|
if (kPower != 0) {
|
BOOST_CHECK(tmp.tail() == 1);
|
}
|
BOOST_CHECK(tmp.head() == 0);
|
int64_t d;
|
BOOST_CHECK(tmp.pop(d));
|
if (kPower != 0) {
|
// BOOST_CHECK(tmp.like_empty());
|
BOOST_CHECK(tmp.head() == 1);
|
BOOST_CHECK(tmp.tail() == 1);
|
}
|
|
ShmObject<Rcb> rcb(shm, "test_rcb");
|
bool try_more = true;
|
|
auto Writer = [&]() {
|
uint64_t n = 0;
|
while ((n = nwrite++) < nmsg) {
|
while (!rcb->push(n, try_more)) {
|
// MySleep();
|
}
|
++writedone;
|
}
|
};
|
std::atomic<uint64_t> nread(0);
|
auto Reader = [&]() {
|
while (nread.load() < nmsg) {
|
int64_t d;
|
if (rcb->pop(d, try_more)) {
|
++nread;
|
total += d;
|
} else {
|
// MySleep();
|
}
|
}
|
};
|
|
#else
|
typedef Circular<int64_t> Rcb;
|
ShmObject<Rcb> rcb(shm, "test_rcb", 16, shm.get_segment_manager());
|
|
typedef FMutex Mutex;
|
// typedef SemMutex Mutex;
|
Mutex mtx(123);
|
auto Writer = [&]() {
|
uint64_t n = 0;
|
while ((n = nwrite++) < nmsg) {
|
auto Write = [&]() {
|
robust::Guard<Mutex> lk(mtx);
|
if (rcb->full()) {
|
return false;
|
} else {
|
rcb->push_back(n);
|
return true;
|
}
|
// return rcb->push_back(n);
|
};
|
while (!Write()) {
|
// MySleep();
|
}
|
++writedone;
|
}
|
};
|
std::atomic<uint64_t> nread(0);
|
auto Reader = [&]() {
|
while (nread.load() < nmsg) {
|
int64_t d;
|
auto Read = [&]() {
|
robust::Guard<Mutex> lk(mtx);
|
if (rcb->empty()) {
|
return false;
|
} else {
|
d = rcb->front();
|
rcb->pop_front();
|
return true;
|
}
|
// return rcb->pop_front(d);
|
};
|
if (Read()) {
|
++nread;
|
total += d;
|
} else {
|
// MySleep();
|
}
|
}
|
};
|
|
#endif
|
|
auto status = [&]() {
|
auto next = steady_clock::now();
|
uint32_t lw = 0;
|
uint32_t lr = 0;
|
do {
|
std::this_thread::sleep_until(next);
|
next += 1s;
|
auto w = writedone.load();
|
auto r = nread.load();
|
printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld\n",
|
w, w - lw, r, r - lr);
|
lw = w;
|
lr = r;
|
} while (nread.load() < nmsg);
|
};
|
|
std::thread st(status);
|
{
|
ThreadManager threads;
|
boost::timer::auto_cpu_timer timer;
|
// printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread);
|
printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, 16, nthread);
|
for (int i = 0; i < nthread; ++i) {
|
threads.Launch(Reader);
|
threads.Launch(Writer);
|
}
|
threads.WaitAll();
|
}
|
st.join();
|
printf("total: %ld, expected: %ld\n", total.load(), correct_total);
|
BOOST_CHECK_EQUAL(total.load(), correct_total);
|
}
|
|
BOOST_AUTO_TEST_CASE(MutexTest)
|
{
|
{
|
int sem_id = semget(100, 1, 0666 | IPC_CREAT);
|
auto P = [&]() {
|
sembuf op = {0, -1, SEM_UNDO};
|
semop(sem_id, &op, 1);
|
};
|
auto V = [&]() {
|
sembuf op = {0, 1, SEM_UNDO};
|
semop(sem_id, &op, 1);
|
};
|
for (int i = 0; i < 10; ++i) {
|
V();
|
}
|
Sleep(10s);
|
|
return;
|
}
|
|
// typedef robust::MFMutex RobustMutex;
|
typedef robust::SemMutex RobustMutex;
|
|
for (int i = 0; i < 20; ++i) {
|
int size = i;
|
int left = size & 7;
|
int rsize = size + ((8 - left) & 7);
|
printf("size: %3d, rsize: %3d\n", size, rsize);
|
}
|
SharedMemory &shm = TestShm();
|
// shm.Remove();
|
// return;
|
GlobalInit(shm);
|
|
const std::string mtx_name("test_mutex");
|
const std::string int_name("test_int");
|
// auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name, 12345);
|
RobustMutex rmtx(12345);
|
auto mtx = &rmtx;
|
auto pi = shm.FindOrCreate<int>(int_name, 100);
|
|
std::mutex m;
|
typedef std::chrono::steady_clock Clock;
|
|
if (pi) {
|
auto old = *pi;
|
printf("int : %d, add1: %d\n", old, ++*pi);
|
}
|
|
auto LockSpeed = [](auto &mutex, const std::string &name) {
|
const int ntimes = 1000 * 1;
|
auto Lock = [&]() {
|
for (int i = 0; i < ntimes; ++i) {
|
mutex.lock();
|
mutex.unlock();
|
}
|
};
|
printf("\nTesting %s lock/unlock %d times\n", name.c_str(), ntimes);
|
{
|
boost::timer::auto_cpu_timer timer;
|
printf("1 thread: ");
|
Lock();
|
}
|
auto InThread = [&](int nthread) {
|
boost::timer::auto_cpu_timer timer;
|
printf("%d threads: ", nthread);
|
std::vector<std::thread> vt;
|
for (int i = 0; i < nthread; ++i) {
|
vt.emplace_back(Lock);
|
}
|
for (auto &t : vt) {
|
t.join();
|
}
|
};
|
InThread(4);
|
InThread(16);
|
InThread(100);
|
InThread(1000);
|
};
|
NullMutex null_mtx;
|
std::mutex std_mtx;
|
CasMutex cas_mtx;
|
FMutex mfmtx(3);
|
boost::interprocess::interprocess_mutex ipc_mutex;
|
SemMutex sem_mtx(3);
|
LockSpeed(null_mtx, "null mutex");
|
LockSpeed(std_mtx, "std::mutex");
|
// LockSpeed(cas_mtx, "CAS mutex");
|
LockSpeed(ipc_mutex, "boost ipc mutex");
|
LockSpeed(mfmtx, "mutex+flock");
|
LockSpeed(sem_mtx, "sem mutex");
|
|
auto TryLock = [&]() {
|
if (mtx->try_lock()) {
|
printf("try_lock ok\n");
|
return true;
|
} else {
|
printf("try_lock failed\n");
|
return false;
|
}
|
};
|
auto Unlock = [&]() {
|
mtx->unlock();
|
printf("unlocked\n");
|
};
|
|
if (mtx) {
|
printf("mtx exists\n");
|
if (TryLock()) {
|
// Sleep(10s);
|
auto op = [&]() {
|
if (TryLock()) {
|
Unlock();
|
}
|
};
|
op();
|
std::thread t(op);
|
t.join();
|
// Unlock();
|
} else {
|
// mtx->unlock();
|
}
|
} else {
|
printf("mtx not exists\n");
|
}
|
}
|