/*
|
* =====================================================================================
|
*
|
* Filename: speed_test.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年03月30日 11时35分21秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "robust.h"
|
#include "util.h"
|
|
using namespace robust;
|
|
BOOST_AUTO_TEST_CASE(SpeedTest)
|
{
|
SharedMemory &shm = TestShm();
|
GlobalInit(shm);
|
auto InitSem = [](auto id) {
|
auto sem_id = semget(id, 1, 0666 | IPC_CREAT);
|
union semun init_val;
|
init_val.val = 1;
|
semctl(sem_id, 0, SETVAL, init_val);
|
return;
|
};
|
|
MQId id = ShmMsgQueue::NewId();
|
InitSem(id);
|
|
const int timeout = 1000;
|
const uint32_t data_size = 1001;
|
const std::string proc_id = "demo_proc";
|
std::atomic<int64_t> nwrite(0);
|
std::atomic<int64_t> nread(0);
|
|
std::string str(data_size, 'a');
|
auto Writer = [&](int writer_id, uint64_t n) {
|
MQId cli_id = ShmMsgQueue::NewId();
|
InitSem(cli_id);
|
|
ShmMsgQueue mq(cli_id, shm, 64);
|
MsgI msg;
|
MsgRequestTopic body;
|
body.set_topic("topic");
|
body.set_data(str);
|
auto head(InitMsgHead(GetType(body), proc_id, mq.Id()));
|
msg.Make(head, body);
|
assert(msg.valid());
|
DEFER1(msg.Release(););
|
|
for (uint64_t i = 0; i < n; ++i) {
|
while (!mq.TrySend(id, msg)) {}
|
++nwrite;
|
}
|
};
|
auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
|
ShmMsgQueue mq(id, shm, 1000);
|
auto now = []() { return steady_clock::now(); };
|
auto tm = now();
|
while (*run) {
|
MsgI msg;
|
BHMsgHead head;
|
if (mq.TryRecv(msg)) {
|
DEFER1(msg.Release());
|
tm = now();
|
++nread;
|
} else if (isfork) {
|
if (now() > tm + 1s) {
|
exit(0); // for forked quit after 1s.
|
}
|
}
|
}
|
};
|
auto State = [&](std::atomic<bool> *run) {
|
auto init = shm.get_free_memory();
|
printf("shm init : %ld\n", init);
|
uint64_t last_read = 0;
|
while (*run) {
|
auto cur = shm.get_free_memory();
|
auto cur_read = nread.load();
|
printf("shm used : %8ld/%ld, write: %8ld, read: %8ld, speed: %8ld\n", init - cur, init, nwrite.load(), cur_read, cur_read - last_read);
|
last_read = cur_read;
|
std::this_thread::sleep_for(1s);
|
}
|
};
|
|
int nwriters[] = {1, 10, 100, 1000};
|
int nreaders[] = {2};
|
const int64_t total_msg = 1000 * 1000;
|
|
auto Test = [&](auto &www, auto &rrr, bool isfork) {
|
for (auto nreader : nreaders) {
|
for (auto nwriter : nwriters) {
|
const uint64_t nmsg = total_msg / nwriter;
|
std::atomic<bool> run(true);
|
std::this_thread::sleep_for(10ms);
|
boost::timer::auto_cpu_timer timer;
|
for (int i = 0; i < nreader; ++i) {
|
rrr.Launch(Reader, i, &run, isfork);
|
}
|
for (int i = 0; i < nwriter; ++i) {
|
www.Launch(Writer, i, nmsg);
|
}
|
www.WaitAll();
|
printf("writer finished\n");
|
run.store(false);
|
rrr.WaitAll();
|
printf("Write %ld msg R(%3d) W(%3d), : ", total_msg, nreader, nwriter);
|
}
|
}
|
};
|
|
std::atomic<bool> run(true);
|
ThreadManager state;
|
state.Launch(State, &run);
|
DEFER1(run.store(false););
|
|
// typedef ProcessManager Manager;
|
// typedef ThreadManager Manager;
|
// const bool isfork = IsSameType<Manager, ProcessManager>::value;
|
|
if (0) {
|
ThreadManager tw, tr;
|
printf("---------------- Testing thread io: -------------------------------------------------------\n");
|
Test(tw, tr, false);
|
}
|
|
if (1) {
|
ProcessManager pw, pr;
|
printf("================ Testing process io: =======================================================\n");
|
Test(pw, pr, true);
|
}
|
}
|
|
// Send Recv Test
|
BOOST_AUTO_TEST_CASE(SRTest)
|
{
|
const int qlen = 64;
|
const size_t msg_length = 100;
|
std::string msg_content(msg_length, 'a');
|
msg_content[20] = '\0';
|
const std::string client_proc_id = "client_proc";
|
const std::string server_proc_id = "server_proc";
|
|
SharedMemory &shm = TestShm();
|
// shm.Remove();
|
// return;
|
GlobalInit(shm);
|
|
auto Avail = [&]() { return shm.get_free_memory(); };
|
auto init_avail = Avail();
|
ShmSocket srv(shm, qlen);
|
ShmSocket cli(shm, qlen);
|
|
int ncli = 1;
|
uint64_t nmsg = 1000 * 1000 * 1;
|
std::atomic<uint64_t> count(0);
|
|
std::atomic<int64_t> last_time(NowSec() - 1);
|
std::atomic<uint64_t> last_count(0);
|
|
auto PrintStatus = [&](int64_t cur) {
|
std::cout << "time: " << cur;
|
printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld\n",
|
count.load(), count - last_count.exchange(count), init_avail - Avail());
|
};
|
auto onRecv = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
|
++count;
|
auto cur = NowSec();
|
if (last_time.exchange(cur) < cur) {
|
PrintStatus(cur);
|
}
|
};
|
cli.Start(onRecv, 2);
|
|
auto Client = [&](int cli_id, int nmsg) {
|
for (int i = 0; i < nmsg; ++i) {
|
auto Req = [&]() {
|
MsgRequestTopic req_body;
|
req_body.set_topic("topic");
|
req_body.set_data(msg_content);
|
auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id()));
|
req_head.add_route()->set_mq_id(cli.id());
|
return cli.Send(srv.id(), req_head, req_body);
|
};
|
|
Req();
|
}
|
};
|
|
std::atomic<bool> stop(false);
|
auto Server = [&]() {
|
MsgI req;
|
BHMsgHead req_head;
|
|
while (!stop) {
|
if (srv.SyncRecv(req, req_head, 10)) {
|
DEFER1(req.Release());
|
|
if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
|
auto src_id = req_head.route()[0].mq_id();
|
auto Reply = [&]() {
|
MsgRequestTopic reply_body;
|
reply_body.set_topic("topic");
|
reply_body.set_data(msg_content);
|
auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
|
return srv.Send(src_id, reply_head, reply_body);
|
};
|
Reply();
|
}
|
}
|
}
|
};
|
|
boost::timer::auto_cpu_timer timer;
|
DEFER1(printf("Request Reply Test:"););
|
|
ThreadManager clients, servers;
|
for (int i = 0; i < 2; ++i) { servers.Launch(Server); }
|
printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
|
for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
|
clients.WaitAll();
|
printf("request ok: %ld\n", count.load());
|
do {
|
std::this_thread::sleep_for(100ms);
|
} while (count.load() < ncli * nmsg);
|
PrintStatus(NowSec());
|
stop = true;
|
servers.WaitAll();
|
// BOOST_CHECK_THROW(reply.Count(), int);
|
}
|