From d17450bd7bc9fd5e98e8e2f00999caffe2e301a6 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 26 三月 2021 17:43:09 +0800
Subject: [PATCH] test thread/fork speed; reset msg after release.
---
src/shm.h | 2
src/msg.h | 2
utest/utest.cpp | 217 +++++++++++++++++++++++++++++++++++++++++++-----------
src/msg.cpp | 4
4 files changed, 177 insertions(+), 48 deletions(-)
diff --git a/src/msg.cpp b/src/msg.cpp
index 66eec4b..9883246 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -59,7 +59,7 @@
}
-int Msg::Release(SharedMemory &shm) const
+int Msg::Release(SharedMemory &shm)
{
if (IsCounted()) {
const int n = count_->Dec();
@@ -69,7 +69,9 @@
}
// free data
shm.Dealloc(ptr_);
+ ptr_ = 0;
shm.Delete(count_);
+ count_ = 0;
return 0;
}
diff --git a/src/msg.h b/src/msg.h
index 910efa5..2c2efac 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -73,7 +73,7 @@
// AddRef and Release works for both counted and not counted msg.
int AddRef() const { return IsCounted() ? count_->Inc() : 1; }
- int Release(SharedMemory &shm) const;
+ int Release(SharedMemory &shm);
int Count() const{ return IsCounted() ? count_->Get() : 1; }
bool IsCounted() const { return static_cast<bool>(count_); }
diff --git a/src/shm.h b/src/shm.h
index 808ed5d..7537067 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -88,7 +88,7 @@
{
pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...);
if (!IsOk()) {
- throw("shm error: " + name_);
+ throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
}
}
Data *find(const std::string &name) { return shm_.find<Data>(ObjName(name).c_str()).first; }
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 2462ecc..89d1ea3 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -13,9 +13,14 @@
#include <boost/uuid/uuid_io.hpp>
#include "shm_queue.h"
#include "bh_util.h"
+#include <sys/types.h>
+#include <sys/wait.h>
using namespace std::chrono_literals;
using namespace bhome_shm;
+
+using namespace boost::posix_time;
+auto Now = []() { return second_clock::universal_time(); };
struct s1000 { char a[1000]; };
@@ -41,7 +46,28 @@
}
};
class ProcessManager {
-
+ std::vector<pid_t> procs_;
+public:
+ ~ProcessManager() { WaitAll(); }
+ template <class T, class ...P>
+ void Launch(T t, P...p) {
+ auto pid = fork();
+ if (pid == 0) {
+ // child
+ t(p...);
+ exit(0);
+ } else if (pid != -1) { // Ok
+ procs_.push_back(pid);
+ }
+ };
+ void WaitAll() {
+ for (auto &pid: procs_) {
+ int status = 0;
+ int options = WUNTRACED | WCONTINUED;
+ waitpid(pid, &status, options);
+ }
+ procs_.clear();
+ }
};
struct ShmRemover {
std::string name_;
@@ -49,11 +75,15 @@
~ShmRemover() { SharedMemory::Remove(name_); }
};
-BOOST_AUTO_TEST_CASE(ShmBasicTest)
+template <class A, class B> struct IsSameType { static const bool value = false; };
+template <class A> struct IsSameType<A,A> { static const bool value = true; };
+
+BOOST_AUTO_TEST_CASE(BasicTest)
{
const std::string shm_name("basic");
ShmRemover auto_remove(shm_name);
SharedMemory shm(shm_name, 1024*1024*10);
+ auto Avail = [&]() { return shm.get_free_memory(); };
offset_ptr<const void> p;
BOOST_CHECK(!p);
@@ -70,7 +100,6 @@
BOOST_CHECK(p.get() == 0);
- auto Avail = [&]() { return shm.get_free_memory(); };
auto init_avail = Avail();
auto BasicTest = [&](int tid, int nloop) {
@@ -101,15 +130,12 @@
bool r = shared_memory_object::remove(shm_name.c_str());
BOOST_CHECK(r);
};
- boost::timer::auto_cpu_timer timer;
- for (int i = 0; i < nloop; ++i)
- {
+ for (int i = 0; i < nloop; ++i) {
Code(i + tid*nloop);
}
};
- boost::timer::auto_cpu_timer timer;
- DEFER1(printf("Basic Test:"););
+ // boost::timer::auto_cpu_timer timer;
ThreadManager threads;
int nthread = 1;
int nloop = 1;
@@ -117,8 +143,24 @@
{
threads.Launch(BasicTest, i, nloop);
}
- printf("end\n");
BOOST_CHECK_EQUAL(init_avail, Avail());
+}
+
+BOOST_AUTO_TEST_CASE(ForkTest)
+{
+ ProcessManager procs;
+ const int nproc = 10;
+
+ printf("Testing fork:\n");
+
+ auto child = [&](int id) {
+ std::this_thread::sleep_for(100ms *id);
+ printf("child id: %3d/%d ends\r", id, nproc);
+ };
+
+ for (int i = 0; i < nproc; ++i) {
+ procs.Launch(child, i+1);
+ }
}
BOOST_AUTO_TEST_CASE(TimedWaitTest)
@@ -127,9 +169,9 @@
ShmRemover auto_remove(shm_name);
SharedMemory shm(shm_name, 1024*1024);
ShmMsgQueue q(shm, 64);
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2; ++i) {
int ms = i * 100;
- printf("Timeout Test %d: ", ms);
+ printf("Timeout Test %4d: ", ms);
boost::timer::auto_cpu_timer timer;
MQId id;
void *data;
@@ -155,7 +197,7 @@
BOOST_CHECK_EQUAL(m0.Release(shm), 2);
BOOST_CHECK_EQUAL(m0.Release(shm), 1);
BOOST_CHECK_EQUAL(m1.Release(shm), 0);
- BOOST_CHECK_THROW(m1.Count(), std::exception);
+ BOOST_CHECK(!m1.IsCounted());
}
BOOST_AUTO_TEST_CASE(MsgHeaderTest)
@@ -176,43 +218,129 @@
MsgMetaV1 result;
result.Parse(buf);
BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0);
-
}
-BOOST_AUTO_TEST_CASE(RequestReplyTest)
+
+BOOST_AUTO_TEST_CASE(SpeedTest)
+{
+ const std::string shm_name("ShmSpeed");
+ ShmRemover auto_remove(shm_name);
+ const int mem_size = 1024*1024*50;
+ MQId id = boost::uuids::random_generator()();
+ const int timeout = 100;
+ const uint32_t data_size = 4000;
+
+ auto Writer = [&](int writer_id, uint64_t n) {
+ SharedMemory shm(shm_name, mem_size);
+ ShmMsgQueue mq(shm, 64);
+ std::string str(data_size, 'a');
+ Msg msg;
+ DEFER1(msg.Release(shm););
+ msg.Build(shm, mq.Id(), str.data(), str.size(), true);
+ for (int i = 0; i < n; ++i) {
+ // mq.Send(id, str.data(), str.size(), timeout);
+ mq.Send(id, msg, timeout);
+ }
+ };
+ auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork){
+ SharedMemory shm(shm_name, mem_size);
+ ShmMsgQueue mq(id, shm, 1000);
+ while(*run) {
+ Msg msg;
+ if (mq.Recv(msg, timeout)) {
+ MsgMetaV1 header;
+ if (!header.Parse(msg.get())) {
+ BOOST_CHECK(false);
+ }
+ if (header.data_size_ != data_size) {
+ BOOST_CHECK(false);
+ }
+ msg.Release(shm);
+ } else if (isfork) {
+ exit(0); // for forked quit after 1s.
+ }
+ }
+ };
+ auto State = [&](std::atomic<bool> *run){
+ SharedMemory shm(shm_name, mem_size);
+ auto init = shm.get_free_memory();
+ printf("shm init : %ld\n", init);
+ while (*run) {
+ auto cur = shm.get_free_memory();
+ printf("shm used : %8ld/%ld\n", init - cur, init);
+ std::this_thread::sleep_for(1s);
+ }
+ };
+
+ int nwriters[] = {1,2,4};
+ int nreaders[] = {1,2};
+
+ auto Test = [&](auto &www, auto &rrr, bool isfork) {
+ for (auto nreader : nreaders) {
+ for (auto nwriter : nwriters) {
+ const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
+ const uint64_t total_msg = nmsg * nwriter;
+ std::atomic<bool> run(true);
+ std::this_thread::sleep_for(10ms);
+ boost::timer::auto_cpu_timer timer;
+ for (int i = 0; i < nreader; ++i) {
+ rrr.Launch(Reader, i, &run, isfork);
+ }
+ for (int i = 0; i < nwriter; ++i) {
+ www.Launch(Writer, i, nmsg);
+ }
+ www.WaitAll();
+ run.store(false);
+ rrr.WaitAll();
+ printf("%3d Write %ld msg R(%3d) W(%3d), : ", getpid(), total_msg, nreader, nwriter);
+ }
+ }
+ };
+
+ std::atomic<bool> run(true);
+ ThreadManager state;
+ state.Launch(State, &run);
+ // typedef ProcessManager Manager;
+ // typedef ThreadManager Manager;
+ // const bool isfork = IsSameType<Manager, ProcessManager>::value;
+ ProcessManager pw, pr;
+ printf("================ Testing process io: =======================================================\n");
+ Test(pw, pr, true);
+ ThreadManager tw, tr;
+ printf("---------------- Testing thread io: -------------------------------------------------------\n");
+ Test(tw, tr, false);
+ run.store(false);
+}
+
+// Request Reply Test
+BOOST_AUTO_TEST_CASE(RRTest)
{
const std::string shm_name("ShmReqRep");
ShmRemover auto_remove(shm_name);
- SharedMemory shm(shm_name, 1024*1024*50);
- auto Avail = [&]() { return shm.get_free_memory(); };
- auto init_avail = Avail();
- // DEFER1(BOOST_CHECK_EQUAL(init_avail, Avail()); printf("Request Reply Test shm No Leak.\n"););
-
- auto f0 = init_avail;
const int qlen = 64;
- ShmMsgQueue srv(shm, qlen);
- ShmMsgQueue cli(shm, qlen);
- auto f1= shm.get_free_memory();
-
const size_t msg_length = 1000;
std::string msg_content(msg_length, 'a');
msg_content[20] = '\0';
- Msg request;
- request.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true);
- Msg reply(request);
- std::atomic<bool> stop(false);
+ SharedMemory shm(shm_name, 1024*1024*50);
+ auto Avail = [&]() { return shm.get_free_memory(); };
+ auto init_avail = Avail();
+ ShmMsgQueue srv(shm, qlen);
+ ShmMsgQueue cli(shm, qlen);
+
+ Msg ref_counted_msg;
+ ref_counted_msg.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true);
+
std::atomic<uint64_t> count(0);
- using namespace boost::posix_time;
- auto Now = []() { return second_clock::universal_time(); };
+
std::atomic<ptime> last_time(Now() - seconds(1));
std::atomic<uint64_t> last_count(0);
- auto Client = [&](int tid, int nmsg){
+ auto Client = [&](int cli_id, int nmsg){
for (int i = 0; i < nmsg; ++i) {
auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); };
- // auto SendRefCounted = [&]() { return cli.Send(srv.Id(), request, 1000); };
+ auto SendRefCounted = [&]() { return cli.Send(srv.Id(), ref_counted_msg, 1000); };
- if (!Send()) {
+ if (!SendRefCounted()) {
printf("********** client send error.\n");
continue;
}
@@ -230,8 +358,8 @@
auto cur = Now();
if (last_time.exchange(cur) != cur) {
std::cout << "time: " << Now();
- printf(", total msg:%10ld, speed:%8ld/s, used mem:%8ld, refcount:%d\n",
- count.load(), count - last_count.exchange(count), init_avail - Avail(), request.Count());
+ printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
+ count.load(), count - last_count.exchange(count), init_avail - Avail(), ref_counted_msg.Count());
last_time = cur;
}
@@ -239,6 +367,7 @@
}
};
+ std::atomic<bool> stop(false);
auto Server = [&](){
void *data = 0;
size_t size = 0;
@@ -247,9 +376,9 @@
if (srv.Recv(src_id, data, size, 100)) {
DEFER1(free(data));
auto Send = [&](){ return srv.Send(src_id, data, size, 100); };
- // auto SendRefCounted = [&](){ return srv.Send(src_id, reply, 100); };
+ auto SendRefCounted = [&](){ return srv.Send(src_id, ref_counted_msg, 100); };
- if (Send()) {
+ if (SendRefCounted()) {
if (size != msg_content.size()) {
BOOST_TEST(false, "server msg size error");
}
@@ -264,23 +393,21 @@
ThreadManager clients, servers;
for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
int ncli = 100*1;
- uint64_t nmsg = 100*100*10;
+ uint64_t nmsg = 100*100*2;
printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
clients.WaitAll();
printf("request ok: %ld\n", count.load());
stop = true;
servers.WaitAll();
- BOOST_CHECK(request.IsCounted());
- BOOST_CHECK_EQUAL(request.Count(), 1);
- BOOST_CHECK(reply.IsCounted());
- BOOST_CHECK_EQUAL(reply.Count(), 1);
- request.Release(shm);
- BOOST_CHECK_THROW(request.Count(), std::exception);
- BOOST_CHECK_THROW(reply.Count(), std::exception);
+ BOOST_CHECK(ref_counted_msg.IsCounted());
+ BOOST_CHECK_EQUAL(ref_counted_msg.Count(), 1);
+ ref_counted_msg.Release(shm);
+ BOOST_CHECK(!ref_counted_msg.IsCounted());
// BOOST_CHECK_THROW(reply.Count(), int);
}
+
inline int MyMin(int a, int b) {
printf("MyMin\n");
return a < b ? a : b;
--
Gitblit v1.8.0