From 6aa7e4c37a70709e7348bd16407c5983a563ed76 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 29 三月 2021 21:11:34 +0800
Subject: [PATCH] test pub/sub msg; fix update last_time;

---
 utest/utest.cpp |   87 ++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 84 insertions(+), 3 deletions(-)

diff --git a/utest/utest.cpp b/utest/utest.cpp
index 6e41116..a074f41 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -15,6 +15,8 @@
 #include "bh_util.h"
 #include <sys/types.h>
 #include <sys/wait.h>
+#include "pubsub.h"
+#include "defs.h"
 
 using namespace std::chrono_literals;
 using namespace bhome_msg;
@@ -332,11 +334,10 @@
             } else {
                 ++count;
                 auto cur = Now();
-                if (last_time.exchange(cur) != cur) {
-                    std::cout << "time: " << Now();
+                if (last_time.exchange(cur) < cur) {
+                    std::cout << "time: " << cur;
                     printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
                            count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
-                    last_time = cur;
                 }
 
             }
@@ -382,6 +383,86 @@
     // BOOST_CHECK_THROW(reply.Count(), int);
 }
 
+BOOST_AUTO_TEST_CASE(PubSubTest)
+{
+    const std::string shm_name("ShmPubSub");
+    ShmRemover auto_remove(shm_name);
+    SharedMemory shm(shm_name, 1024*1024*50);
+    auto Avail = [&]() { return shm.get_free_memory(); };
+    auto init_avail = Avail();
+
+    BusManager bus(shm);
+    bus.Start(1);
+    std::this_thread::sleep_for(100ms);
+
+    std::atomic<uint64_t> count(0);
+    std::atomic<ptime> last_time(Now() - seconds(1));
+    std::atomic<uint64_t> last_count(0);
+
+    const uint64_t nmsg = 1000 * 100;
+
+    const int timeout = 1000;
+    auto Sub = [&](int id, const std::vector<std::string> &topics) {
+        ShmMsgQueue client(shm, 8);
+        client.Send(kBusQueueId, MakeSub(client.Id(), topics), timeout);
+        for (int i = 0; i < nmsg * topics.size(); ++i) {
+            BHMsg msg;
+            if (client.Recv(msg, 1000)) {
+                if (msg.type() != kMsgTypePublish) {
+                    BOOST_CHECK(false);
+                }
+                DataPub pub;
+                if (!pub.ParseFromString(msg.body())) {
+                    BOOST_CHECK(false);
+                }
+                ++count;
+                auto cur = Now();
+                if (last_time.exchange(cur) < cur) {
+                    std::cout << "time: " << cur;
+                    printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
+                           count.load(), count - last_count.exchange(count), init_avail - Avail());
+                }
+                // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
+
+            } else {
+                printf("sub %2d recv timeout\n", id);
+            }
+
+        }
+    };
+    auto Pub = [&](const std::string &topic) {
+        ShmMsgQueue provider(shm, 0);
+        for (int i = 0; i < nmsg; ++i) {
+            std::string data = topic + std::to_string(i);
+            bool r = provider.Send(kBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
+            if (!r) {
+                printf("pub ret: %s\n", r ? "ok" : "fail");
+            }
+        }
+    };
+    ThreadManager threads;
+    typedef std::vector<std::string> Topics;
+    Topics topics;
+    topics.push_back("000-xxx");
+    topics.push_back("111-football");
+    topics.push_back("222-sport");
+    // topics.push_back("333-sport");
+    // topics.push_back("444-sport");
+    Topics part;
+    for (int i = 0; i < topics.size(); ++i) {
+        part.push_back(topics[i]);
+        threads.Launch(Sub, i, part);
+    }
+    std::this_thread::sleep_for(100ms);
+    for (auto &topic: topics) {
+        threads.Launch(Pub, topic);
+    }
+    threads.Launch(Pub, "some_else");
+
+    threads.WaitAll();
+
+    bus.Stop();
+}
 
 inline int MyMin(int a, int b) {
     printf("MyMin\n");

--
Gitblit v1.8.0