From 77a6c3512a44dfe6540dde71946e6484fe4f173f Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 10 五月 2021 16:05:28 +0800
Subject: [PATCH] test lock code.
---
utest/speed_test.cpp | 86 +++++++++++++++++++++++++++++--------------
1 files changed, 58 insertions(+), 28 deletions(-)
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index d145ab4..f8f54f5 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -15,69 +15,91 @@
*
* =====================================================================================
*/
+#include "robust.h"
#include "util.h"
-#include <boost/date_time/posix_time/posix_time.hpp>
-using namespace boost::posix_time;
+using namespace robust;
BOOST_AUTO_TEST_CASE(SpeedTest)
{
- const int mem_size = 1024 * 1024 * 50;
SharedMemory &shm = TestShm();
- MsgI::BindShm(shm);
+ GlobalInit(shm);
+ auto InitSem = [](auto id) {
+ auto sem_id = semget(id, 1, 0666 | IPC_CREAT);
+ union semun init_val;
+ init_val.val = 1;
+ semctl(sem_id, 0, SETVAL, init_val);
+ return;
+ };
MQId id = ShmMsgQueue::NewId();
- const int timeout = 1000;
- const uint32_t data_size = 4000;
- const std::string proc_id = "demo_proc";
+ InitSem(id);
+ const int timeout = 1000;
+ const uint32_t data_size = 1001;
+ const std::string proc_id = "demo_proc";
+ std::atomic<int64_t> nwrite(0);
+ std::atomic<int64_t> nread(0);
+
+ std::string str(data_size, 'a');
auto Writer = [&](int writer_id, uint64_t n) {
- ShmMsgQueue mq(shm, 64);
- std::string str(data_size, 'a');
+ MQId cli_id = ShmMsgQueue::NewId();
+ InitSem(cli_id);
+
+ ShmMsgQueue mq(cli_id, shm, 64);
MsgI msg;
MsgRequestTopic body;
body.set_topic("topic");
body.set_data(str);
- auto head(InitMsgHead(GetType(body), proc_id));
+ auto head(InitMsgHead(GetType(body), proc_id, mq.Id()));
msg.Make(head, body);
assert(msg.valid());
DEFER1(msg.Release(););
for (uint64_t i = 0; i < n; ++i) {
while (!mq.TrySend(id, msg)) {}
+ ++nwrite;
}
};
auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
ShmMsgQueue mq(id, shm, 1000);
+ auto now = []() { return steady_clock::now(); };
+ auto tm = now();
while (*run) {
MsgI msg;
BHMsgHead head;
- if (mq.Recv(msg, timeout)) {
+ if (mq.TryRecv(msg)) {
DEFER1(msg.Release());
- // ok
+ tm = now();
+ ++nread;
} else if (isfork) {
- exit(0); // for forked quit after 1s.
+ if (now() > tm + 1s) {
+ exit(0); // for forked quit after 1s.
+ }
}
}
};
auto State = [&](std::atomic<bool> *run) {
auto init = shm.get_free_memory();
printf("shm init : %ld\n", init);
+ uint64_t last_read = 0;
while (*run) {
auto cur = shm.get_free_memory();
- printf("shm used : %8ld/%ld\n", init - cur, init);
+ auto cur_read = nread.load();
+ printf("shm used : %8ld/%ld, write: %8ld, read: %8ld, speed: %8ld\n", init - cur, init, nwrite.load(), cur_read, cur_read - last_read);
+ last_read = cur_read;
std::this_thread::sleep_for(1s);
}
};
- int nwriters[] = {1, 2, 4};
- int nreaders[] = {1, 2};
+ int nwriters[] = {1, 10, 100};
+ int nreaders[] = {2};
+ const int64_t total_msg = 1000 * 100;
auto Test = [&](auto &www, auto &rrr, bool isfork) {
for (auto nreader : nreaders) {
for (auto nwriter : nwriters) {
- const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
- const uint64_t total_msg = nmsg * nwriter;
+ const uint64_t nmsg = total_msg / nwriter;
std::atomic<bool> run(true);
std::this_thread::sleep_for(10ms);
boost::timer::auto_cpu_timer timer;
@@ -99,16 +121,22 @@
std::atomic<bool> run(true);
ThreadManager state;
state.Launch(State, &run);
+ DEFER1(run.store(false););
+
// typedef ProcessManager Manager;
// typedef ThreadManager Manager;
// const bool isfork = IsSameType<Manager, ProcessManager>::value;
- ProcessManager pw, pr;
- printf("================ Testing process io: =======================================================\n");
- Test(pw, pr, true);
- ThreadManager tw, tr;
- printf("---------------- Testing thread io: -------------------------------------------------------\n");
- Test(tw, tr, false);
- run.store(false);
+
+ {
+ ThreadManager tw, tr;
+ printf("---------------- Testing thread io: -------------------------------------------------------\n");
+ Test(tw, tr, false);
+ }
+ {
+ ProcessManager pw, pr;
+ printf("================ Testing process io: =======================================================\n");
+ Test(pw, pr, true);
+ }
}
// Send Recv Test
@@ -122,7 +150,9 @@
const std::string server_proc_id = "server_proc";
SharedMemory &shm = TestShm();
- MsgI::BindShm(shm);
+ // shm.Remove();
+ // return;
+ GlobalInit(shm);
auto Avail = [&]() { return shm.get_free_memory(); };
auto init_avail = Avail();
@@ -156,7 +186,7 @@
MsgRequestTopic req_body;
req_body.set_topic("topic");
req_body.set_data(msg_content);
- auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
+ auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id()));
req_head.add_route()->set_mq_id(cli.id());
return cli.Send(srv.id(), req_head, req_body);
};
@@ -180,7 +210,7 @@
MsgRequestTopic reply_body;
reply_body.set_topic("topic");
reply_body.set_data(msg_content);
- auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
+ auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
return srv.Send(src_id, reply_head, reply_body);
};
Reply();
--
Gitblit v1.8.0