From 491d98b3ba32cafed5682552bd870ca0ef93275c Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 30 三月 2021 18:29:09 +0800 Subject: [PATCH] add ShmSocket as shm interface, add sub/pub. --- utest/utest.cpp | 66 +++++++++++++++++--------------- 1 files changed, 35 insertions(+), 31 deletions(-) diff --git a/utest/utest.cpp b/utest/utest.cpp index f7571c8..bb5c14d 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -8,6 +8,7 @@ #include "pubsub.h" #include "defs.h" #include "util.h" +#include "socket.h" template <class A, class B> struct IsSameType { static const bool value = false; }; template <class A> struct IsSameType<A,A> { static const bool value = true; }; @@ -73,45 +74,48 @@ std::atomic<uint64_t> last_count(0); const uint64_t nmsg = 100; - const int timeout = 1000; auto Sub = [&](int id, const std::vector<std::string> &topics) { - ShmMsgQueue client(shm, 8); - client.Send(kBHBusQueueId, MakeSub(client.Id(), topics), timeout); - for (int i = 0; i < nmsg * topics.size(); ++i) { - BHMsg msg; - if (client.Recv(msg, 1000)) { - if (msg.type() != kMsgTypePublish) { - BOOST_CHECK(false); - } - DataPub pub; - if (!pub.ParseFromString(msg.body())) { - BOOST_CHECK(false); - } - ++count; - auto cur = Now(); - if (last_time.exchange(cur) < cur) { - std::cout << "time: " << cur; - printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", - count.load(), count - last_count.exchange(count), init_avail - Avail()); - } - // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); - } else { - printf("sub %2d recv timeout\n", id); - } + ShmSocket client(ShmSocket::eSockSubscribe, shm); + bool r = client.Subscribe(topics, timeout); + std::mutex mutex; + std::condition_variable cv; - } + int i = 0; + auto OnRecv = [&](BHMsg &msg) { + if (msg.type() != kMsgTypePublish) { + BOOST_CHECK(false); + } + DataPub pub; + if (!pub.ParseFromString(msg.body())) { + BOOST_CHECK(false); + } + ++count; + + auto cur = Now(); + if (last_time.exchange(cur) < cur) { + std::cout << "time: " << cur; + printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", + count.load(), count - last_count.exchange(count), init_avail - Avail()); + } + if (++i >= nmsg*topics.size()) { + cv.notify_one(); + } + // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); + }; + client.SetRecvCallback(OnRecv); + + std::unique_lock<std::mutex> lk(mutex); + cv.wait(lk); + }; + auto Pub = [&](const std::string &topic) { - ShmMsgQueue provider(shm, 0); + ShmSocket provider(ShmSocket::eSockPublish, shm); for (int i = 0; i < nmsg; ++i) { std::string data = topic + std::to_string(i) + std::string(1000, '-'); - MsgI msg; - msg.MakeRC(shm, MakePub(topic, data.data(), data.size())); - DEFER1(msg.Release(shm)); - bool r = provider.Send(kBHBusQueueId, msg, timeout); - + bool r = provider.Publish(topic, data.data(), data.size(), timeout); // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout); if (!r) { printf("pub ret: %s\n", r ? "ok" : "fail"); -- Gitblit v1.8.0