/*
|
* =====================================================================================
|
*
|
* Filename: speed_test.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年03月30日 11时35分21秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "util.h"
|
#include <boost/date_time/posix_time/posix_time.hpp>
|
|
using namespace boost::posix_time;
|
|
BOOST_AUTO_TEST_CASE(SpeedTest)
|
{
|
const std::string shm_name("ShmSpeed");
|
ShmRemover auto_remove(shm_name);
|
const int mem_size = 1024 * 1024 * 50;
|
MQId id = boost::uuids::random_generator()();
|
const int timeout = 100;
|
const uint32_t data_size = 4000;
|
|
auto Writer = [&](int writer_id, uint64_t n) {
|
SharedMemory shm(shm_name, mem_size);
|
ShmMsgQueue mq(shm, 64);
|
std::string str(data_size, 'a');
|
MsgI msg;
|
DEFER1(msg.Release(shm););
|
msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size()));
|
for (uint64_t i = 0; i < n; ++i) {
|
// mq.Send(id, str.data(), str.size(), timeout);
|
mq.Send(id, msg, timeout);
|
}
|
};
|
auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
|
SharedMemory shm(shm_name, mem_size);
|
ShmMsgQueue mq(id, shm, 1000);
|
while (*run) {
|
BHMsg msg;
|
if (mq.Recv(msg, timeout)) {
|
// ok
|
} else if (isfork) {
|
exit(0); // for forked quit after 1s.
|
}
|
}
|
};
|
auto State = [&](std::atomic<bool> *run) {
|
SharedMemory shm(shm_name, mem_size);
|
auto init = shm.get_free_memory();
|
printf("shm init : %ld\n", init);
|
while (*run) {
|
auto cur = shm.get_free_memory();
|
printf("shm used : %8ld/%ld\n", init - cur, init);
|
std::this_thread::sleep_for(1s);
|
}
|
};
|
|
int nwriters[] = {1, 2, 4};
|
int nreaders[] = {1, 2};
|
|
auto Test = [&](auto &www, auto &rrr, bool isfork) {
|
for (auto nreader : nreaders) {
|
for (auto nwriter : nwriters) {
|
const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
|
const uint64_t total_msg = nmsg * nwriter;
|
std::atomic<bool> run(true);
|
std::this_thread::sleep_for(10ms);
|
boost::timer::auto_cpu_timer timer;
|
for (int i = 0; i < nreader; ++i) {
|
rrr.Launch(Reader, i, &run, isfork);
|
}
|
for (int i = 0; i < nwriter; ++i) {
|
www.Launch(Writer, i, nmsg);
|
}
|
www.WaitAll();
|
run.store(false);
|
rrr.WaitAll();
|
printf("Write %ld msg R(%3d) W(%3d), : ", total_msg, nreader, nwriter);
|
}
|
}
|
};
|
|
std::atomic<bool> run(true);
|
ThreadManager state;
|
state.Launch(State, &run);
|
// typedef ProcessManager Manager;
|
// typedef ThreadManager Manager;
|
// const bool isfork = IsSameType<Manager, ProcessManager>::value;
|
ProcessManager pw, pr;
|
printf("================ Testing process io: =======================================================\n");
|
Test(pw, pr, true);
|
ThreadManager tw, tr;
|
printf("---------------- Testing thread io: -------------------------------------------------------\n");
|
Test(tw, tr, false);
|
run.store(false);
|
}
|
|
// Request Reply Test
|
BOOST_AUTO_TEST_CASE(RRTest)
|
{
|
const std::string shm_name("ShmReqRep");
|
ShmRemover auto_remove(shm_name);
|
const int qlen = 64;
|
const size_t msg_length = 1000;
|
std::string msg_content(msg_length, 'a');
|
msg_content[20] = '\0';
|
|
SharedMemory shm(shm_name, 1024 * 1024 * 50);
|
auto Avail = [&]() { return shm.get_free_memory(); };
|
auto init_avail = Avail();
|
ShmMsgQueue srv(shm, qlen);
|
ShmMsgQueue cli(shm, qlen);
|
|
MsgI request_rc;
|
request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size()));
|
MsgI reply_rc;
|
reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size()));
|
|
std::atomic<uint64_t> count(0);
|
|
std::atomic<ptime> last_time(Now() - seconds(1));
|
std::atomic<uint64_t> last_count(0);
|
|
auto Client = [&](int cli_id, int nmsg) {
|
for (int i = 0; i < nmsg; ++i) {
|
auto Req = [&]() {
|
return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100);
|
};
|
auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
|
|
if (!ReqRC()) {
|
printf("********** client send error.\n");
|
continue;
|
}
|
BHMsg msg;
|
if (!cli.Recv(msg, 1000)) {
|
printf("********** client recv error.\n");
|
} else {
|
++count;
|
auto cur = Now();
|
if (last_time.exchange(cur) < cur) {
|
std::cout << "time: " << cur;
|
printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
|
count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
|
}
|
}
|
}
|
};
|
|
std::atomic<bool> stop(false);
|
auto Server = [&]() {
|
BHMsg req;
|
while (!stop) {
|
if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) {
|
auto &mqid = req.route()[0].mq_id();
|
MQId src_id;
|
memcpy(&src_id, mqid.data(), sizeof(src_id));
|
auto Reply = [&]() {
|
return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100);
|
};
|
auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
|
|
if (ReplyRC()) {
|
}
|
}
|
}
|
};
|
|
boost::timer::auto_cpu_timer timer;
|
DEFER1(printf("Request Reply Test:"););
|
|
ThreadManager clients, servers;
|
for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
|
int ncli = 100 * 1;
|
uint64_t nmsg = 100 * 100 * 2;
|
printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
|
for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
|
clients.WaitAll();
|
printf("request ok: %ld\n", count.load());
|
stop = true;
|
servers.WaitAll();
|
BOOST_CHECK(request_rc.IsCounted());
|
BOOST_CHECK_EQUAL(request_rc.Count(), 1);
|
request_rc.Release(shm);
|
BOOST_CHECK(!request_rc.IsCounted());
|
// BOOST_CHECK_THROW(reply.Count(), int);
|
}
|