From 491d98b3ba32cafed5682552bd870ca0ef93275c Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 30 三月 2021 18:29:09 +0800
Subject: [PATCH] add ShmSocket as shm interface, add sub/pub.

---
 utest/utest.cpp |   66 +++++++++++++++++---------------
 1 files changed, 35 insertions(+), 31 deletions(-)

diff --git a/utest/utest.cpp b/utest/utest.cpp
index f7571c8..bb5c14d 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -8,6 +8,7 @@
 #include "pubsub.h"
 #include "defs.h"
 #include "util.h"
+#include "socket.h"
 
 template <class A, class B> struct IsSameType { static const bool value = false; };
 template <class A> struct IsSameType<A,A> { static const bool value = true; };
@@ -73,45 +74,48 @@
     std::atomic<uint64_t> last_count(0);
 
     const uint64_t nmsg = 100;
-
     const int timeout = 1000;
     auto Sub = [&](int id, const std::vector<std::string> &topics) {
-        ShmMsgQueue client(shm, 8);
-        client.Send(kBHBusQueueId, MakeSub(client.Id(), topics), timeout);
-        for (int i = 0; i < nmsg * topics.size(); ++i) {
-            BHMsg msg;
-            if (client.Recv(msg, 1000)) {
-                if (msg.type() != kMsgTypePublish) {
-                    BOOST_CHECK(false);
-                }
-                DataPub pub;
-                if (!pub.ParseFromString(msg.body())) {
-                    BOOST_CHECK(false);
-                }
-                ++count;
-                auto cur = Now();
-                if (last_time.exchange(cur) < cur) {
-                    std::cout << "time: " << cur;
-                    printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
-                           count.load(), count - last_count.exchange(count), init_avail - Avail());
-                }
-                // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
-            } else {
-                printf("sub %2d recv timeout\n", id);
-            }
+        ShmSocket client(ShmSocket::eSockSubscribe, shm);
+        bool r = client.Subscribe(topics, timeout);
+        std::mutex mutex;
+        std::condition_variable cv;
 
-        }
+        int i = 0;
+        auto OnRecv = [&](BHMsg &msg) {
+            if (msg.type() != kMsgTypePublish) {
+                BOOST_CHECK(false);
+            }
+            DataPub pub;
+            if (!pub.ParseFromString(msg.body())) {
+                BOOST_CHECK(false);
+            }
+            ++count;
+
+            auto cur = Now();
+            if (last_time.exchange(cur) < cur) {
+                std::cout << "time: " << cur;
+                printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
+                       count.load(), count - last_count.exchange(count), init_avail - Avail());
+            }
+            if (++i >= nmsg*topics.size()) {
+                cv.notify_one();
+            }
+            // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
+        };
+        client.SetRecvCallback(OnRecv);
+
+        std::unique_lock<std::mutex> lk(mutex);
+        cv.wait(lk);
+
     };
+
     auto Pub = [&](const std::string &topic) {
-        ShmMsgQueue provider(shm, 0);
+        ShmSocket provider(ShmSocket::eSockPublish, shm);
         for (int i = 0; i < nmsg; ++i) {
             std::string data = topic + std::to_string(i) + std::string(1000, '-');
 
-            MsgI msg;
-            msg.MakeRC(shm, MakePub(topic, data.data(), data.size()));
-            DEFER1(msg.Release(shm));
-            bool r = provider.Send(kBHBusQueueId, msg, timeout);
-
+            bool r = provider.Publish(topic, data.data(), data.size(), timeout);
             // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
             if (!r) {
                 printf("pub ret: %s\n", r ? "ok" : "fail");

--
Gitblit v1.8.0