From 6aa7e4c37a70709e7348bd16407c5983a563ed76 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 29 三月 2021 21:11:34 +0800
Subject: [PATCH] test pub/sub msg; fix update last_time;

---
 src/msg.h       |    4 
 src/pubsub.h    |    6 ++
 src/pubsub.cpp  |   72 ++++++++++++++++++++++-
 utest/utest.cpp |   87 ++++++++++++++++++++++++++++-
 src/msg.cpp     |    8 +-
 5 files changed, 163 insertions(+), 14 deletions(-)

diff --git a/src/msg.cpp b/src/msg.cpp
index ed8adba..78834a8 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -37,7 +37,6 @@
     assert(data && size);
     BHMsg msg(InitMsg(kMsgTypeRequest));
     msg.set_body(data, size);
-    BHAddress addr;
     msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
     return msg;
 }
@@ -50,10 +49,11 @@
     return msg;
 }
 
-BHMsg MakeSubUnsub(const std::vector<std::string> &topics, const MsgType sub_unsub)
+BHMsg MakeSubUnsub(const MQId &client, const std::vector<std::string> &topics, const MsgType sub_unsub)
 {
     assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
     BHMsg msg(InitMsg(sub_unsub));
+    msg.add_route()->set_mq_id(&client, sizeof(client));
     DataSub subs;
     for (auto &t : topics) {
         subs.add_topics(t);
@@ -62,8 +62,8 @@
     return msg;
 }
 
-BHMsg MakeSub(const std::vector<std::string> &topics) { return MakeSubUnsub(topics, kMsgTypeSubscribe); }
-BHMsg MakeUnsub(const std::vector<std::string> &topics) { return MakeSubUnsub(topics, kMsgTypeUnsubscribe); }
+BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeSubscribe); }
+BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeUnsubscribe); }
 
 BHMsg MakePub(const std::string &topic, const void *data, const size_t size)
 {
diff --git a/src/msg.h b/src/msg.h
index 5cb0ce4..f3fe726 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -47,8 +47,8 @@
 
 BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size);
 BHMsg MakeReply(const void *data, const size_t size);
-BHMsg MakeSub(const std::vector<std::string> &topics);
-BHMsg MakeUnsub(const std::vector<std::string> &topics);
+BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics); 
+BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics); 
 BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
 
 class MsgI {
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index ee2614a..686c7d5 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -17,14 +17,17 @@
  */
 #include "pubsub.h"
 #include <chrono>
+#include "bh_util.h"
 
 namespace bhome_shm {
 
 using namespace std::chrono_literals;
 const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
 const int kMaxWorker = 16;
+using namespace bhome_msg;
 
 BusManager::BusManager(SharedMemory &shm):
+shm_(shm),
 busq_(kBusQueueId, shm, 1000),
 run_(false)
 {
@@ -42,15 +45,12 @@
     // start
     auto Worker = [&](){
         while (this->run_) {
-            std::this_thread::sleep_for(100ms);
             BusManager &self = *this;
             BHMsg msg;
             const int timeout_ms = 100;
-            if (!self.busq_.Recv(msg, timeout_ms)) {
-                continue;
+            if (self.busq_.Recv(msg, timeout_ms)) {
+                self.OnMsg(msg);
             }
-            // handle msg;
-            // type: subscribe(topic), publish(topic, data)
         }
     };
 
@@ -81,5 +81,67 @@
     return false;
 }
 
+void BusManager::OnMsg(const BHMsg &msg)
+{
+    auto OnSubChange = [&](auto &&update) {
+        DataSub sub;
+        if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
+            assert(sizeof(MQId) == msg.route(0).mq_id().size());
+            MQId client;
+            memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
+
+            std::lock_guard<std::mutex> guard(mutex_);
+            auto &topics = sub.topics();
+            for (auto &topic : topics) {
+                try {
+                    update(topic, client);
+                } catch(...) {
+                    //TODO log error
+                }
+            }
+        }
+    };
+
+    auto Sub1 = [this](const std::string &topic, const MQId &id) {
+        records_[topic].insert(id);
+    };
+
+    auto Unsub1 = [this](const std::string &topic, const MQId &id) {
+        auto pos = records_.find(topic);
+        if (pos != records_.end()) {
+            if (pos->second.erase(id) && pos->second.empty()) {
+                records_.erase(pos);
+            }
+        }
+    };
+
+    auto OnPublish = [&]() {
+        DataPub pub;
+        MsgI pubmsg;
+        if (!pub.ParseFromString(msg.body()) || !pubmsg.MakeRC(shm_, msg)) {
+            return;
+        }
+        DEFER1(pubmsg.Release(shm_));
+
+        std::lock_guard<std::mutex> guard(mutex_);
+        auto pos = records_.find(pub.topic());
+        if (pos != records_.end() && !pos->second.empty()) {
+            auto &clients = pos->second;
+            for (auto &cli : clients) {
+                busq_.Send(cli, pubmsg, 100);
+            }
+        } else {
+            // printf("invalid topic: %s\n", pub.topic().c_str());
+        }
+    };
+
+    switch (msg.type()) {
+        case kMsgTypeSubscribe: OnSubChange(Sub1); break;
+        case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
+        case kMsgTypePublish : OnPublish(); break;
+        default: break;
+    }
+}
+
 } // namespace bhome_shm
 
diff --git a/src/pubsub.h b/src/pubsub.h
index 0628216..c1f98af 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -23,18 +23,24 @@
 #include <atomic>
 #include <mutex>
 #include <vector>
+#include <unordered_map>
+#include <set>
 
 namespace bhome_shm {
 
 // publish/subcribe manager.
 class BusManager
 {
+    SharedMemory &shm_;
     ShmMsgQueue busq_;
     std::atomic<bool> run_;
     std::vector<std::thread> workers_;
     std::mutex mutex_;
+    typedef std::set<MQId> Clients;
+    std::unordered_map<std::string, Clients> records_;
 
     bool StopNoLock();
+    void OnMsg(const BHMsg &msg);
 public:
     BusManager(SharedMemory &shm);
     ~BusManager();
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 6e41116..a074f41 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -15,6 +15,8 @@
 #include "bh_util.h"
 #include <sys/types.h>
 #include <sys/wait.h>
+#include "pubsub.h"
+#include "defs.h"
 
 using namespace std::chrono_literals;
 using namespace bhome_msg;
@@ -332,11 +334,10 @@
             } else {
                 ++count;
                 auto cur = Now();
-                if (last_time.exchange(cur) != cur) {
-                    std::cout << "time: " << Now();
+                if (last_time.exchange(cur) < cur) {
+                    std::cout << "time: " << cur;
                     printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
                            count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
-                    last_time = cur;
                 }
 
             }
@@ -382,6 +383,86 @@
     // BOOST_CHECK_THROW(reply.Count(), int);
 }
 
+BOOST_AUTO_TEST_CASE(PubSubTest)
+{
+    const std::string shm_name("ShmPubSub");
+    ShmRemover auto_remove(shm_name);
+    SharedMemory shm(shm_name, 1024*1024*50);
+    auto Avail = [&]() { return shm.get_free_memory(); };
+    auto init_avail = Avail();
+
+    BusManager bus(shm);
+    bus.Start(1);
+    std::this_thread::sleep_for(100ms);
+
+    std::atomic<uint64_t> count(0);
+    std::atomic<ptime> last_time(Now() - seconds(1));
+    std::atomic<uint64_t> last_count(0);
+
+    const uint64_t nmsg = 1000 * 100;
+
+    const int timeout = 1000;
+    auto Sub = [&](int id, const std::vector<std::string> &topics) {
+        ShmMsgQueue client(shm, 8);
+        client.Send(kBusQueueId, MakeSub(client.Id(), topics), timeout);
+        for (int i = 0; i < nmsg * topics.size(); ++i) {
+            BHMsg msg;
+            if (client.Recv(msg, 1000)) {
+                if (msg.type() != kMsgTypePublish) {
+                    BOOST_CHECK(false);
+                }
+                DataPub pub;
+                if (!pub.ParseFromString(msg.body())) {
+                    BOOST_CHECK(false);
+                }
+                ++count;
+                auto cur = Now();
+                if (last_time.exchange(cur) < cur) {
+                    std::cout << "time: " << cur;
+                    printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
+                           count.load(), count - last_count.exchange(count), init_avail - Avail());
+                }
+                // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
+
+            } else {
+                printf("sub %2d recv timeout\n", id);
+            }
+
+        }
+    };
+    auto Pub = [&](const std::string &topic) {
+        ShmMsgQueue provider(shm, 0);
+        for (int i = 0; i < nmsg; ++i) {
+            std::string data = topic + std::to_string(i);
+            bool r = provider.Send(kBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
+            if (!r) {
+                printf("pub ret: %s\n", r ? "ok" : "fail");
+            }
+        }
+    };
+    ThreadManager threads;
+    typedef std::vector<std::string> Topics;
+    Topics topics;
+    topics.push_back("000-xxx");
+    topics.push_back("111-football");
+    topics.push_back("222-sport");
+    // topics.push_back("333-sport");
+    // topics.push_back("444-sport");
+    Topics part;
+    for (int i = 0; i < topics.size(); ++i) {
+        part.push_back(topics[i]);
+        threads.Launch(Sub, i, part);
+    }
+    std::this_thread::sleep_for(100ms);
+    for (auto &topic: topics) {
+        threads.Launch(Pub, topic);
+    }
+    threads.Launch(Pub, "some_else");
+
+    threads.WaitAll();
+
+    bus.Stop();
+}
 
 inline int MyMin(int a, int b) {
     printf("MyMin\n");

--
Gitblit v1.8.0