#include #include "../src/shm.h" #include "../src/bh_util.h" #include #include #include #include #include #include #include #include #include #include #include using namespace std::chrono_literals; using namespace bhome_shm; struct s1000 { char a[1000]; }; typedef std::function FuncVV; class ScopeCall : private boost::noncopyable { FuncVV f_; public: ScopeCall(FuncVV f):f_(f) { f_(); } ~ScopeCall() { f_(); } }; class ThreadManager { std::vector threads_; public: ~ThreadManager() { WaitAll(); } template void Launch(T t, P...p) { threads_.emplace_back(t, p...); } void WaitAll() { for (auto &t : threads_) { if (t.joinable()) { t.join(); } } } }; class ProcessManager { }; struct ShmRemover { std::string name_; ShmRemover(const std::string &name):name_(name) { SharedMemory::Remove(name_); } ~ShmRemover() { SharedMemory::Remove(name_); } }; BOOST_AUTO_TEST_CASE(ShmBasic) { const std::string shm_name("basic"); ShmRemover auto_remove(shm_name); SharedMemory shm(shm_name, 1024*1024*10); auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); auto BasicTest = [&](int tid, int nloop) { auto Code = [&](int id) { typedef ShmObject Int; std::string name = std::to_string(id); auto a0 = Avail(); Int i1(shm, name); auto a1 = Avail(); BOOST_CHECK_LT(a1, a0); printf("s1000 size: %ld\n", a0 - a1); i1->a[0] = 5; Int i2(shm, name); auto a2 = Avail(); BOOST_CHECK_EQUAL(a1, a2); BOOST_CHECK_EQUAL(i1.data(), i2.data()); int i = i1.Remove(); BOOST_CHECK_EQUAL(Avail(), a0); { auto old = Avail(); void *p = shm.allocate(1024); shm.deallocate(p); BOOST_CHECK_EQUAL(old, Avail()); } bool r = shared_memory_object::remove(shm_name.c_str()); BOOST_CHECK(r); }; boost::timer::auto_cpu_timer timer; for (int i = 0; i < nloop; ++i) { Code(i + tid*nloop); } }; boost::timer::auto_cpu_timer timer; DEFER1(printf("Basic Test:");); ThreadManager threads; int nthread = 1; int nloop = 1; for (int i = 0; i < nthread; ++i) { threads.Launch(BasicTest, i, nloop); } printf("end\n"); BOOST_CHECK_EQUAL(init_avail, Avail()); } BOOST_AUTO_TEST_CASE(TimedWait) { const std::string shm_name("shm_wait"); ShmRemover auto_remove(shm_name); SharedMemory shm(shm_name, 1024*1024); ShmMsgQueue q(shm, 64); for (int i = 0; i < 5; ++i) { int ms = i * 100; printf("Timeout Test %d: ", ms); boost::timer::auto_cpu_timer timer; MQId id; void *data; size_t size; bool r = q.Recv(id, data, size, ms); BOOST_CHECK(!r); } } BOOST_AUTO_TEST_CASE(MsgHeader) { MsgMetaV1 head; BOOST_CHECK_EQUAL(head.self_size_, sizeof(head)); BOOST_CHECK_EQUAL(head.type_, kMsgTypeNormal); BOOST_CHECK_EQUAL(head.tag_, kMsgMetaTag); BOOST_CHECK_EQUAL(head.data_size_, 0); BOOST_CHECK_EQUAL(head.src_id_[5], 0); head.data_size_ = 100; auto rand_id = boost::uuids::random_generator()(); memcpy(head.src_id_, &rand_id, sizeof(rand_id)); head.type_ = 123; BOOST_CHECK_EQUAL(sizeof(head.src_id_), sizeof(rand_id)); char buf[100] = {0}; head.Pack(buf); MsgMetaV1 result; result.Parse(buf); BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0); } BOOST_AUTO_TEST_CASE(RequestReply) { const std::string shm_name("ShmReqRep"); ShmRemover auto_remove(shm_name); SharedMemory shm(shm_name, 1024*1024*50); auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); // DEFER1(BOOST_CHECK_EQUAL(init_avail, Avail()); printf("Request Reply Test shm No Leak.\n");); auto f0 = init_avail; const int qlen = 64; ShmMsgQueue srv(shm, qlen); ShmMsgQueue cli(shm, qlen); auto f1= shm.get_free_memory(); const size_t msg_length = 1000; std::string msg_content(msg_length, 'a'); msg_content[20] = '\0'; std::atomic stop(false); std::atomic count(0); using namespace boost::posix_time; auto Now = []() { return second_clock::universal_time(); }; std::atomic last_time(Now()); std::atomic last_count(0); auto Client = [&](int tid, int nmsg){ for (int i = 0; i < nmsg; ++i) { if (!cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000)) { printf("********** client send error.\n"); continue; } MQId id; void *data = 0; size_t size = 0; if (!cli.Recv(id, data, size, 1000)) { printf("********** client recv error.\n"); } else { DEFER1(free(data)); if(size != msg_length) { BOOST_CHECK(false); } ++count; auto cur = Now(); if (last_time.exchange(cur) != cur) { std::cout << "time: " << Now(); printf(", total msg:%10ld, speed:%8ld, used mem:%8ld\n", count.load(), count - last_count.exchange(count), init_avail - Avail()); last_time = cur; } } } }; auto Server = [&](){ void *data = 0; size_t size = 0; MQId src_id; while (!stop) { if (srv.Recv(src_id, data, size, 100)) { DEFER1(free(data)); if (srv.Send(src_id, data, size, 100)) { if (size != msg_content.size()) { BOOST_TEST(false, "server msg size error"); } } } } }; boost::timer::auto_cpu_timer timer; DEFER1(printf("Request Reply Test:");); ThreadManager clients, servers; for (int i = 0; i < qlen; ++i) { servers.Launch(Server); } int ncli = 100; uint64_t nmsg = 1000*10; printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg); for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); } clients.WaitAll(); printf("request ok: %ld\n", count.load()); stop = true; servers.WaitAll(); } int test_main(int argc, char *argv[]) { printf("test main\n"); int a = 0; int b = 0; BOOST_CHECK_EQUAL(a, b); return 0; }