#include #include #include #include #include #include #include #include #include #include #include #include #include "shm_queue.h" #include "bh_util.h" using namespace std::chrono_literals; using namespace bhome_shm; struct s1000 { char a[1000]; }; typedef std::function FuncVV; class ScopeCall : private boost::noncopyable { FuncVV f_; public: ScopeCall(FuncVV f):f_(f) { f_(); } ~ScopeCall() { f_(); } }; class ThreadManager { std::vector threads_; public: ~ThreadManager() { WaitAll(); } template void Launch(T t, P...p) { threads_.emplace_back(t, p...); } void WaitAll() { for (auto &t : threads_) { if (t.joinable()) { t.join(); } } } }; class ProcessManager { }; struct ShmRemover { std::string name_; ShmRemover(const std::string &name):name_(name) { SharedMemory::Remove(name_); } ~ShmRemover() { SharedMemory::Remove(name_); } }; BOOST_AUTO_TEST_CASE(ShmBasicTest) { const std::string shm_name("basic"); ShmRemover auto_remove(shm_name); SharedMemory shm(shm_name, 1024*1024*10); offset_ptr p; BOOST_CHECK(!p); BOOST_CHECK(p.get() == 0); p = 0; BOOST_CHECK(!p); BOOST_CHECK(p.get() == 0); const char *str = "basic"; p = str; BOOST_CHECK(p); BOOST_CHECK(p.get() == str); p = 0; BOOST_CHECK(!p); BOOST_CHECK(p.get() == 0); auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); auto BasicTest = [&](int tid, int nloop) { auto Code = [&](int id) { typedef ShmObject Int; std::string name = std::to_string(id); auto a0 = Avail(); Int i1(shm, name); auto a1 = Avail(); BOOST_CHECK_LT(a1, a0); printf("s1000 size: %ld\n", a0 - a1); i1->a[0] = 5; Int i2(shm, name); auto a2 = Avail(); BOOST_CHECK_EQUAL(a1, a2); BOOST_CHECK_EQUAL(i1.data(), i2.data()); int i = i1.Remove(); BOOST_CHECK_EQUAL(Avail(), a0); { auto old = Avail(); void *p = shm.Alloc(1024); shm.Dealloc(p); BOOST_CHECK_EQUAL(old, Avail()); } bool r = shared_memory_object::remove(shm_name.c_str()); BOOST_CHECK(r); }; boost::timer::auto_cpu_timer timer; for (int i = 0; i < nloop; ++i) { Code(i + tid*nloop); } }; boost::timer::auto_cpu_timer timer; DEFER1(printf("Basic Test:");); ThreadManager threads; int nthread = 1; int nloop = 1; for (int i = 0; i < nthread; ++i) { threads.Launch(BasicTest, i, nloop); } printf("end\n"); BOOST_CHECK_EQUAL(init_avail, Avail()); } BOOST_AUTO_TEST_CASE(TimedWaitTest) { const std::string shm_name("shm_wait"); ShmRemover auto_remove(shm_name); SharedMemory shm(shm_name, 1024*1024); ShmMsgQueue q(shm, 64); for (int i = 0; i < 5; ++i) { int ms = i * 100; printf("Timeout Test %d: ", ms); boost::timer::auto_cpu_timer timer; MQId id; void *data; size_t size; bool r = q.Recv(id, data, size, ms); BOOST_CHECK(!r); } } BOOST_AUTO_TEST_CASE(RefCountTest) { const std::string shm_name("ShmRefCount"); ShmRemover auto_remove(shm_name); SharedMemory shm(shm_name, 1024*1024); Msg m0(shm.Alloc(1000), shm.New()); BOOST_CHECK(m0.IsCounted()); BOOST_CHECK_EQUAL(m0.Count(), 1); Msg m1 = m0; BOOST_CHECK(m1.IsCounted()); BOOST_CHECK_EQUAL(m1.AddRef(), 2); BOOST_CHECK_EQUAL(m0.AddRef(), 3); BOOST_CHECK_EQUAL(m0.RemoveRef(), 2); BOOST_CHECK_EQUAL(m0.RemoveRef(), 1); BOOST_CHECK_EQUAL(m1.RemoveRef(), 0); BOOST_CHECK_EQUAL(m1.Count(), 0); } BOOST_AUTO_TEST_CASE(MsgHeaderTest) { MsgMetaV1 head; BOOST_CHECK_EQUAL(head.self_size_, sizeof(head)); BOOST_CHECK_EQUAL(head.type_, kMsgTypeNormal); BOOST_CHECK_EQUAL(head.tag_, kMsgMetaTag); BOOST_CHECK_EQUAL(head.data_size_, 0); BOOST_CHECK(head.src_id_ == boost::uuids::nil_uuid()); head.data_size_ = 100; head.src_id_ = boost::uuids::random_generator()(); head.type_ = 123; char buf[100] = {0}; head.Pack(buf); MsgMetaV1 result; result.Parse(buf); BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0); } BOOST_AUTO_TEST_CASE(RequestReplyTest) { const std::string shm_name("ShmReqRep"); ShmRemover auto_remove(shm_name); SharedMemory shm(shm_name, 1024*1024*50); auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); // DEFER1(BOOST_CHECK_EQUAL(init_avail, Avail()); printf("Request Reply Test shm No Leak.\n");); auto f0 = init_avail; const int qlen = 64; ShmMsgQueue srv(shm, qlen); ShmMsgQueue cli(shm, qlen); auto f1= shm.get_free_memory(); const size_t msg_length = 1000; std::string msg_content(msg_length, 'a'); msg_content[20] = '\0'; Msg request; request.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true); Msg reply(request); std::atomic stop(false); std::atomic count(0); using namespace boost::posix_time; auto Now = []() { return second_clock::universal_time(); }; std::atomic last_time(Now() - seconds(1)); std::atomic last_count(0); auto Client = [&](int tid, int nmsg){ for (int i = 0; i < nmsg; ++i) { auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); }; auto SendRefCounted = [&]() { return cli.Send(srv.Id(), request, 1000); }; if (!Send()) { printf("********** client send error.\n"); continue; } MQId id; void *data = 0; size_t size = 0; if (!cli.Recv(id, data, size, 1000)) { printf("********** client recv error.\n"); } else { DEFER1(free(data)); if(size != msg_length) { BOOST_CHECK(false); } ++count; auto cur = Now(); if (last_time.exchange(cur) != cur) { std::cout << "time: " << Now(); printf(", total msg:%10ld, speed:%8ld/s, used mem:%8ld, refcount:%d\n", count.load(), count - last_count.exchange(count), init_avail - Avail(), request.Count()); last_time = cur; } } } }; auto Server = [&](){ void *data = 0; size_t size = 0; MQId src_id; while (!stop) { if (srv.Recv(src_id, data, size, 100)) { DEFER1(free(data)); auto Send = [&](){ return srv.Send(src_id, data, size, 100); }; auto SendRefCounted = [&](){ return srv.Send(src_id, reply, 100); }; if (SendRefCounted()) { if (size != msg_content.size()) { BOOST_TEST(false, "server msg size error"); } } } } }; boost::timer::auto_cpu_timer timer; DEFER1(printf("Request Reply Test:");); ThreadManager clients, servers; for (int i = 0; i < qlen; ++i) { servers.Launch(Server); } int ncli = 100*1; uint64_t nmsg = 100*100; printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg); for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); } clients.WaitAll(); printf("request ok: %ld\n", count.load()); stop = true; servers.WaitAll(); BOOST_CHECK(request.IsCounted()); BOOST_CHECK_EQUAL(request.Count(), 1); BOOST_CHECK(reply.IsCounted()); BOOST_CHECK_EQUAL(reply.Count(), 1); if (request.RemoveRef() == 0) { BOOST_CHECK_EQUAL(reply.Count(), 0); request.FreeFrom(shm); } } int test_main(int argc, char *argv[]) { printf("test main\n"); int a = 0; int b = 0; BOOST_CHECK_EQUAL(a, b); return 0; }