lichao
2021-03-30 491d98b3ba32cafed5682552bd870ca0ef93275c
utest/utest.cpp
@@ -8,6 +8,7 @@
#include "pubsub.h"
#include "defs.h"
#include "util.h"
#include "socket.h"
template <class A, class B> struct IsSameType { static const bool value = false; };
template <class A> struct IsSameType<A,A> { static const bool value = true; };
@@ -73,45 +74,48 @@
    std::atomic<uint64_t> last_count(0);
    const uint64_t nmsg = 100;
    const int timeout = 1000;
    auto Sub = [&](int id, const std::vector<std::string> &topics) {
        ShmMsgQueue client(shm, 8);
        client.Send(kBHBusQueueId, MakeSub(client.Id(), topics), timeout);
        for (int i = 0; i < nmsg * topics.size(); ++i) {
            BHMsg msg;
            if (client.Recv(msg, 1000)) {
                if (msg.type() != kMsgTypePublish) {
                    BOOST_CHECK(false);
                }
                DataPub pub;
                if (!pub.ParseFromString(msg.body())) {
                    BOOST_CHECK(false);
                }
                ++count;
                auto cur = Now();
                if (last_time.exchange(cur) < cur) {
                    std::cout << "time: " << cur;
                    printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
                           count.load(), count - last_count.exchange(count), init_avail - Avail());
                }
                // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
            } else {
                printf("sub %2d recv timeout\n", id);
            }
        ShmSocket client(ShmSocket::eSockSubscribe, shm);
        bool r = client.Subscribe(topics, timeout);
        std::mutex mutex;
        std::condition_variable cv;
        }
        int i = 0;
        auto OnRecv = [&](BHMsg &msg) {
            if (msg.type() != kMsgTypePublish) {
                BOOST_CHECK(false);
            }
            DataPub pub;
            if (!pub.ParseFromString(msg.body())) {
                BOOST_CHECK(false);
            }
            ++count;
            auto cur = Now();
            if (last_time.exchange(cur) < cur) {
                std::cout << "time: " << cur;
                printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
                       count.load(), count - last_count.exchange(count), init_avail - Avail());
            }
            if (++i >= nmsg*topics.size()) {
                cv.notify_one();
            }
            // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
        };
        client.SetRecvCallback(OnRecv);
        std::unique_lock<std::mutex> lk(mutex);
        cv.wait(lk);
    };
    auto Pub = [&](const std::string &topic) {
        ShmMsgQueue provider(shm, 0);
        ShmSocket provider(ShmSocket::eSockPublish, shm);
        for (int i = 0; i < nmsg; ++i) {
            std::string data = topic + std::to_string(i) + std::string(1000, '-');
            MsgI msg;
            msg.MakeRC(shm, MakePub(topic, data.data(), data.size()));
            DEFER1(msg.Release(shm));
            bool r = provider.Send(kBHBusQueueId, msg, timeout);
            bool r = provider.Publish(topic, data.data(), data.size(), timeout);
            // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
            if (!r) {
                printf("pub ret: %s\n", r ? "ok" : "fail");