#include <stdio.h>
|
#include <string>
|
#include <vector>
|
#include <thread>
|
#include <chrono>
|
#include <atomic>
|
#include <boost/noncopyable.hpp>
|
#include <boost/timer/timer.hpp>
|
#include <boost/test/unit_test.hpp>
|
#include <boost/date_time/posix_time/posix_time.hpp>
|
#include <boost/date_time/microsec_time_clock.hpp>
|
#include <boost/uuid/uuid_generators.hpp>
|
#include <boost/uuid/uuid_io.hpp>
|
#include "shm_queue.h"
|
#include "bh_util.h"
|
#include <sys/types.h>
|
#include <sys/wait.h>
|
|
using namespace std::chrono_literals;
|
using namespace bhome_shm;
|
|
using namespace boost::posix_time;
|
auto Now = []() { return second_clock::universal_time(); };
|
|
struct s1000 { char a[1000]; };
|
|
typedef std::function<void(void)> FuncVV;
|
class ScopeCall : private boost::noncopyable {
|
FuncVV f_;
|
public:
|
ScopeCall(FuncVV f):f_(f) { f_(); }
|
~ScopeCall() { f_(); }
|
};
|
class ThreadManager {
|
std::vector<std::thread> threads_;
|
public:
|
~ThreadManager() { WaitAll(); }
|
template <class T, class...P>
|
void Launch(T t, P...p) { threads_.emplace_back(t, p...); }
|
void WaitAll() {
|
for (auto &t : threads_) {
|
if (t.joinable()) {
|
t.join();
|
}
|
}
|
}
|
};
|
class ProcessManager {
|
std::vector<pid_t> procs_;
|
public:
|
~ProcessManager() { WaitAll(); }
|
template <class T, class ...P>
|
void Launch(T t, P...p) {
|
auto pid = fork();
|
if (pid == 0) {
|
// child
|
t(p...);
|
exit(0);
|
} else if (pid != -1) { // Ok
|
procs_.push_back(pid);
|
}
|
};
|
void WaitAll() {
|
for (auto &pid: procs_) {
|
int status = 0;
|
int options = WUNTRACED | WCONTINUED;
|
waitpid(pid, &status, options);
|
}
|
procs_.clear();
|
}
|
};
|
struct ShmRemover {
|
std::string name_;
|
ShmRemover(const std::string &name):name_(name) { SharedMemory::Remove(name_); }
|
~ShmRemover() { SharedMemory::Remove(name_); }
|
};
|
|
template <class A, class B> struct IsSameType { static const bool value = false; };
|
template <class A> struct IsSameType<A,A> { static const bool value = true; };
|
|
BOOST_AUTO_TEST_CASE(BasicTest)
|
{
|
const std::string shm_name("basic");
|
ShmRemover auto_remove(shm_name);
|
SharedMemory shm(shm_name, 1024*1024*10);
|
auto Avail = [&]() { return shm.get_free_memory(); };
|
|
offset_ptr<const void> p;
|
BOOST_CHECK(!p);
|
BOOST_CHECK(p.get() == 0);
|
p = 0;
|
BOOST_CHECK(!p);
|
BOOST_CHECK(p.get() == 0);
|
const char *str = "basic";
|
p = str;
|
BOOST_CHECK(p);
|
BOOST_CHECK(p.get() == str);
|
p = 0;
|
BOOST_CHECK(!p);
|
BOOST_CHECK(p.get() == 0);
|
|
|
auto init_avail = Avail();
|
|
auto BasicTest = [&](int tid, int nloop) {
|
auto Code = [&](int id) {
|
|
typedef ShmObject<s1000> Int;
|
std::string name = std::to_string(id);
|
auto a0 = Avail();
|
Int i1(shm, name);
|
auto a1 = Avail();
|
BOOST_CHECK_LT(a1, a0);
|
printf("s1000 size: %ld\n", a0 - a1);
|
i1->a[0] = 5;
|
Int i2(shm, name);
|
auto a2 = Avail();
|
BOOST_CHECK_EQUAL(a1, a2);
|
BOOST_CHECK_EQUAL(i1.data(), i2.data());
|
int i = i1.Remove();
|
BOOST_CHECK_EQUAL(Avail(), a0);
|
|
{
|
auto old = Avail();
|
void *p = shm.Alloc(1024);
|
shm.Dealloc(p);
|
BOOST_CHECK_EQUAL(old, Avail());
|
}
|
|
bool r = shared_memory_object::remove(shm_name.c_str());
|
BOOST_CHECK(r);
|
};
|
for (int i = 0; i < nloop; ++i) {
|
Code(i + tid*nloop);
|
}
|
};
|
|
// boost::timer::auto_cpu_timer timer;
|
ThreadManager threads;
|
int nthread = 1;
|
int nloop = 1;
|
for (int i = 0; i < nthread; ++i)
|
{
|
threads.Launch(BasicTest, i, nloop);
|
}
|
BOOST_CHECK_EQUAL(init_avail, Avail());
|
}
|
|
BOOST_AUTO_TEST_CASE(ForkTest)
|
{
|
ProcessManager procs;
|
const int nproc = 10;
|
|
printf("Testing fork:\n");
|
|
auto child = [&](int id) {
|
std::this_thread::sleep_for(100ms *id);
|
printf("child id: %3d/%d ends\r", id, nproc);
|
};
|
|
for (int i = 0; i < nproc; ++i) {
|
procs.Launch(child, i+1);
|
}
|
}
|
|
BOOST_AUTO_TEST_CASE(TimedWaitTest)
|
{
|
const std::string shm_name("shm_wait");
|
ShmRemover auto_remove(shm_name);
|
SharedMemory shm(shm_name, 1024*1024);
|
ShmMsgQueue q(shm, 64);
|
for (int i = 0; i < 2; ++i) {
|
int ms = i * 100;
|
printf("Timeout Test %4d: ", ms);
|
boost::timer::auto_cpu_timer timer;
|
MQId id;
|
void *data;
|
size_t size;
|
bool r = q.Recv(id, data, size, ms);
|
BOOST_CHECK(!r);
|
}
|
}
|
|
BOOST_AUTO_TEST_CASE(RefCountTest)
|
{
|
const std::string shm_name("ShmRefCount");
|
ShmRemover auto_remove(shm_name);
|
SharedMemory shm(shm_name, 1024*1024);
|
|
Msg m0(shm.Alloc(1000), shm.New<RefCount>());
|
BOOST_CHECK(m0.IsCounted());
|
BOOST_CHECK_EQUAL(m0.Count(), 1);
|
Msg m1 = m0;
|
BOOST_CHECK(m1.IsCounted());
|
BOOST_CHECK_EQUAL(m1.AddRef(), 2);
|
BOOST_CHECK_EQUAL(m0.AddRef(), 3);
|
BOOST_CHECK_EQUAL(m0.Release(shm), 2);
|
BOOST_CHECK_EQUAL(m0.Release(shm), 1);
|
BOOST_CHECK_EQUAL(m1.Release(shm), 0);
|
BOOST_CHECK(!m1.IsCounted());
|
}
|
|
BOOST_AUTO_TEST_CASE(MsgHeaderTest)
|
{
|
MsgMetaV1 head;
|
BOOST_CHECK_EQUAL(head.self_size_, sizeof(head));
|
BOOST_CHECK_EQUAL(head.type_, kMsgTypeNormal);
|
BOOST_CHECK_EQUAL(head.tag_, kMsgMetaTag);
|
BOOST_CHECK_EQUAL(head.data_size_, 0);
|
BOOST_CHECK(head.src_id_ == boost::uuids::nil_uuid());
|
|
head.data_size_ = 100;
|
head.src_id_ = boost::uuids::random_generator()();
|
head.type_ = 123;
|
|
char buf[100] = {0};
|
head.Pack(buf);
|
MsgMetaV1 result;
|
result.Parse(buf);
|
BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0);
|
}
|
|
BOOST_AUTO_TEST_CASE(SpeedTest)
|
{
|
const std::string shm_name("ShmSpeed");
|
ShmRemover auto_remove(shm_name);
|
const int mem_size = 1024*1024*50;
|
MQId id = boost::uuids::random_generator()();
|
const int timeout = 100;
|
const uint32_t data_size = 4000;
|
|
auto Writer = [&](int writer_id, uint64_t n) {
|
SharedMemory shm(shm_name, mem_size);
|
ShmMsgQueue mq(shm, 64);
|
std::string str(data_size, 'a');
|
Msg msg;
|
DEFER1(msg.Release(shm););
|
msg.Build(shm, mq.Id(), str.data(), str.size(), true);
|
for (int i = 0; i < n; ++i) {
|
// mq.Send(id, str.data(), str.size(), timeout);
|
mq.Send(id, msg, timeout);
|
}
|
};
|
auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork){
|
SharedMemory shm(shm_name, mem_size);
|
ShmMsgQueue mq(id, shm, 1000);
|
while(*run) {
|
Msg msg;
|
if (mq.Recv(msg, timeout)) {
|
MsgMetaV1 header;
|
if (!header.Parse(msg.get())) {
|
BOOST_CHECK(false);
|
}
|
if (header.data_size_ != data_size) {
|
BOOST_CHECK(false);
|
}
|
msg.Release(shm);
|
} else if (isfork) {
|
exit(0); // for forked quit after 1s.
|
}
|
}
|
};
|
auto State = [&](std::atomic<bool> *run){
|
SharedMemory shm(shm_name, mem_size);
|
auto init = shm.get_free_memory();
|
printf("shm init : %ld\n", init);
|
while (*run) {
|
auto cur = shm.get_free_memory();
|
printf("shm used : %8ld/%ld\n", init - cur, init);
|
std::this_thread::sleep_for(1s);
|
}
|
};
|
|
int nwriters[] = {1,2,4};
|
int nreaders[] = {1,2};
|
|
auto Test = [&](auto &www, auto &rrr, bool isfork) {
|
for (auto nreader : nreaders) {
|
for (auto nwriter : nwriters) {
|
const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
|
const uint64_t total_msg = nmsg * nwriter;
|
std::atomic<bool> run(true);
|
std::this_thread::sleep_for(10ms);
|
boost::timer::auto_cpu_timer timer;
|
for (int i = 0; i < nreader; ++i) {
|
rrr.Launch(Reader, i, &run, isfork);
|
}
|
for (int i = 0; i < nwriter; ++i) {
|
www.Launch(Writer, i, nmsg);
|
}
|
www.WaitAll();
|
run.store(false);
|
rrr.WaitAll();
|
printf("%3d Write %ld msg R(%3d) W(%3d), : ", getpid(), total_msg, nreader, nwriter);
|
}
|
}
|
};
|
|
std::atomic<bool> run(true);
|
ThreadManager state;
|
state.Launch(State, &run);
|
// typedef ProcessManager Manager;
|
// typedef ThreadManager Manager;
|
// const bool isfork = IsSameType<Manager, ProcessManager>::value;
|
ProcessManager pw, pr;
|
printf("================ Testing process io: =======================================================\n");
|
Test(pw, pr, true);
|
ThreadManager tw, tr;
|
printf("---------------- Testing thread io: -------------------------------------------------------\n");
|
Test(tw, tr, false);
|
run.store(false);
|
}
|
|
// Request Reply Test
|
BOOST_AUTO_TEST_CASE(RRTest)
|
{
|
const std::string shm_name("ShmReqRep");
|
ShmRemover auto_remove(shm_name);
|
const int qlen = 64;
|
const size_t msg_length = 1000;
|
std::string msg_content(msg_length, 'a');
|
msg_content[20] = '\0';
|
|
SharedMemory shm(shm_name, 1024*1024*50);
|
auto Avail = [&]() { return shm.get_free_memory(); };
|
auto init_avail = Avail();
|
ShmMsgQueue srv(shm, qlen);
|
ShmMsgQueue cli(shm, qlen);
|
|
Msg ref_counted_msg;
|
ref_counted_msg.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true);
|
|
std::atomic<uint64_t> count(0);
|
|
std::atomic<ptime> last_time(Now() - seconds(1));
|
std::atomic<uint64_t> last_count(0);
|
|
auto Client = [&](int cli_id, int nmsg){
|
for (int i = 0; i < nmsg; ++i) {
|
auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); };
|
auto SendRefCounted = [&]() { return cli.Send(srv.Id(), ref_counted_msg, 1000); };
|
|
if (!SendRefCounted()) {
|
printf("********** client send error.\n");
|
continue;
|
}
|
MQId id;
|
void *data = 0;
|
size_t size = 0;
|
if (!cli.Recv(id, data, size, 1000)) {
|
printf("********** client recv error.\n");
|
} else {
|
DEFER1(free(data));
|
if(size != msg_length) {
|
BOOST_CHECK(false);
|
}
|
++count;
|
auto cur = Now();
|
if (last_time.exchange(cur) != cur) {
|
std::cout << "time: " << Now();
|
printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
|
count.load(), count - last_count.exchange(count), init_avail - Avail(), ref_counted_msg.Count());
|
last_time = cur;
|
}
|
|
}
|
}
|
};
|
|
std::atomic<bool> stop(false);
|
auto Server = [&](){
|
void *data = 0;
|
size_t size = 0;
|
MQId src_id;
|
while (!stop) {
|
if (srv.Recv(src_id, data, size, 100)) {
|
DEFER1(free(data));
|
auto Send = [&](){ return srv.Send(src_id, data, size, 100); };
|
auto SendRefCounted = [&](){ return srv.Send(src_id, ref_counted_msg, 100); };
|
|
if (SendRefCounted()) {
|
if (size != msg_content.size()) {
|
BOOST_TEST(false, "server msg size error");
|
}
|
}
|
}
|
}
|
};
|
|
boost::timer::auto_cpu_timer timer;
|
DEFER1(printf("Request Reply Test:"););
|
|
ThreadManager clients, servers;
|
for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
|
int ncli = 100*1;
|
uint64_t nmsg = 100*100*2;
|
printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
|
for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
|
clients.WaitAll();
|
printf("request ok: %ld\n", count.load());
|
stop = true;
|
servers.WaitAll();
|
BOOST_CHECK(ref_counted_msg.IsCounted());
|
BOOST_CHECK_EQUAL(ref_counted_msg.Count(), 1);
|
ref_counted_msg.Release(shm);
|
BOOST_CHECK(!ref_counted_msg.IsCounted());
|
// BOOST_CHECK_THROW(reply.Count(), int);
|
}
|
|
|
inline int MyMin(int a, int b) {
|
printf("MyMin\n");
|
return a < b ? a : b;
|
}
|
int test_main(int argc, char *argv[])
|
{
|
printf("test main\n");
|
int a = 0;
|
int b = 0;
|
BOOST_CHECK_EQUAL(a, b);
|
int n = MyMin(4,6);
|
for (int i = 0; i < n; ++i) {
|
printf("i = %d\n", i);
|
}
|
|
return 0;
|
}
|