/*
|
* =====================================================================================
|
*
|
* Filename: speed_test.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年03月30日 11时35分21秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "util.h"
|
#include <boost/date_time/posix_time/posix_time.hpp>
|
|
using namespace boost::posix_time;
|
|
BOOST_AUTO_TEST_CASE(SpeedTest)
|
{
|
const std::string shm_name("ShmSpeed");
|
ShmRemover auto_remove(shm_name);
|
const int mem_size = 1024 * 1024 * 50;
|
MQId id = boost::uuids::random_generator()();
|
const int timeout = 1000;
|
const uint32_t data_size = 4000;
|
const std::string proc_id = "demo_proc";
|
|
auto Writer = [&](int writer_id, uint64_t n) {
|
SharedMemory shm(shm_name, mem_size);
|
ShmMsgQueue mq(shm, 64);
|
std::string str(data_size, 'a');
|
MsgI msg;
|
MsgRequestTopic body;
|
body.set_topic("topic");
|
body.set_data(str);
|
auto head(InitMsgHead(GetType(body), proc_id));
|
msg.Make(shm, head, body);
|
assert(msg.valid());
|
DEFER1(msg.Release(shm););
|
|
for (uint64_t i = 0; i < n; ++i) {
|
while (!mq.TrySend(id, msg)) {}
|
}
|
};
|
auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
|
SharedMemory shm(shm_name, mem_size);
|
ShmMsgQueue mq(id, shm, 1000);
|
while (*run) {
|
MsgI msg;
|
BHMsgHead head;
|
if (mq.Recv(msg, timeout)) {
|
DEFER1(msg.Release(shm));
|
// ok
|
} else if (isfork) {
|
exit(0); // for forked quit after 1s.
|
}
|
}
|
};
|
auto State = [&](std::atomic<bool> *run) {
|
SharedMemory shm(shm_name, mem_size);
|
auto init = shm.get_free_memory();
|
printf("shm init : %ld\n", init);
|
while (*run) {
|
auto cur = shm.get_free_memory();
|
printf("shm used : %8ld/%ld\n", init - cur, init);
|
std::this_thread::sleep_for(1s);
|
}
|
};
|
|
int nwriters[] = {1, 2, 4};
|
int nreaders[] = {1, 2};
|
|
auto Test = [&](auto &www, auto &rrr, bool isfork) {
|
for (auto nreader : nreaders) {
|
for (auto nwriter : nwriters) {
|
const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
|
const uint64_t total_msg = nmsg * nwriter;
|
std::atomic<bool> run(true);
|
std::this_thread::sleep_for(10ms);
|
boost::timer::auto_cpu_timer timer;
|
for (int i = 0; i < nreader; ++i) {
|
rrr.Launch(Reader, i, &run, isfork);
|
}
|
for (int i = 0; i < nwriter; ++i) {
|
www.Launch(Writer, i, nmsg);
|
}
|
www.WaitAll();
|
printf("writer finished\n");
|
run.store(false);
|
rrr.WaitAll();
|
printf("Write %ld msg R(%3d) W(%3d), : ", total_msg, nreader, nwriter);
|
}
|
}
|
};
|
|
std::atomic<bool> run(true);
|
ThreadManager state;
|
state.Launch(State, &run);
|
// typedef ProcessManager Manager;
|
// typedef ThreadManager Manager;
|
// const bool isfork = IsSameType<Manager, ProcessManager>::value;
|
ProcessManager pw, pr;
|
printf("================ Testing process io: =======================================================\n");
|
Test(pw, pr, true);
|
ThreadManager tw, tr;
|
printf("---------------- Testing thread io: -------------------------------------------------------\n");
|
Test(tw, tr, false);
|
run.store(false);
|
}
|
|
// Send Recv Test
|
BOOST_AUTO_TEST_CASE(SRTest)
|
{
|
const std::string shm_name("ShmSendRecv");
|
ShmRemover auto_remove(shm_name);
|
const int qlen = 64;
|
const size_t msg_length = 100;
|
std::string msg_content(msg_length, 'a');
|
msg_content[20] = '\0';
|
const std::string client_proc_id = "client_proc";
|
const std::string server_proc_id = "server_proc";
|
|
SharedMemory shm(shm_name, 1024 * 1024 * 512);
|
auto Avail = [&]() { return shm.get_free_memory(); };
|
auto init_avail = Avail();
|
ShmSocket srv(shm, qlen);
|
ShmSocket cli(shm, qlen);
|
|
int ncli = 1;
|
uint64_t nmsg = 1000 * 1000 * 1;
|
std::atomic<uint64_t> count(0);
|
|
std::atomic<int64_t> last_time(NowSec() - 1);
|
std::atomic<uint64_t> last_count(0);
|
|
auto PrintStatus = [&](int64_t cur) {
|
std::cout << "time: " << cur;
|
printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld\n",
|
count.load(), count - last_count.exchange(count), init_avail - Avail());
|
};
|
auto onRecv = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
|
++count;
|
auto cur = NowSec();
|
if (last_time.exchange(cur) < cur) {
|
PrintStatus(cur);
|
}
|
};
|
cli.Start(onRecv, 2);
|
|
auto Client = [&](int cli_id, int nmsg) {
|
for (int i = 0; i < nmsg; ++i) {
|
auto Req = [&]() {
|
MsgRequestTopic req_body;
|
req_body.set_topic("topic");
|
req_body.set_data(msg_content);
|
auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
|
req_head.add_route()->set_mq_id(&cli.id(), cli.id().size());
|
return cli.Send(&srv.id(), req_head, req_body);
|
};
|
|
Req();
|
}
|
};
|
|
std::atomic<bool> stop(false);
|
auto Server = [&]() {
|
MsgI req;
|
BHMsgHead req_head;
|
|
while (!stop) {
|
if (srv.SyncRecv(req, req_head, 10)) {
|
DEFER1(req.Release(shm));
|
|
if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
|
auto &mqid = req_head.route()[0].mq_id();
|
MQId src_id;
|
memcpy(&src_id, mqid.data(), sizeof(src_id));
|
auto Reply = [&]() {
|
MsgRequestTopic reply_body;
|
reply_body.set_topic("topic");
|
reply_body.set_data(msg_content);
|
auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
|
return srv.Send(&src_id, reply_head, reply_body);
|
};
|
Reply();
|
}
|
}
|
}
|
};
|
|
boost::timer::auto_cpu_timer timer;
|
DEFER1(printf("Request Reply Test:"););
|
|
ThreadManager clients, servers;
|
for (int i = 0; i < 2; ++i) { servers.Launch(Server); }
|
printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
|
for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
|
clients.WaitAll();
|
printf("request ok: %ld\n", count.load());
|
do {
|
std::this_thread::sleep_for(100ms);
|
} while (count.load() < ncli * nmsg);
|
PrintStatus(NowSec());
|
stop = true;
|
servers.WaitAll();
|
// BOOST_CHECK_THROW(reply.Count(), int);
|
}
|