From 491d98b3ba32cafed5682552bd870ca0ef93275c Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 30 三月 2021 18:29:09 +0800
Subject: [PATCH] add ShmSocket as shm interface, add sub/pub.

---
 src/shm.h         |    4 
 src/socket.h      |   74 ++++++++++++
 src/shm_queue.h   |    5 
 src/socket.cpp    |  134 ++++++++++++++++++++++
 src/center.h      |   26 ++++
 src/defs.h        |    6 +
 utest/utest.cpp   |   66 +++++-----
 src/shm_queue.cpp |   12 +
 src/center.cpp    |   29 ++++
 9 files changed, 320 insertions(+), 36 deletions(-)

diff --git a/src/center.cpp b/src/center.cpp
new file mode 100644
index 0000000..809b6d1
--- /dev/null
+++ b/src/center.cpp
@@ -0,0 +1,29 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  center.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�03鏈�30鏃� 16鏃�19鍒�37绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "center.h"
+#include "defs.h"
+#include "shm.h"
+
+using namespace bhome_shm;
+
+SharedMemory &BHomeShm()
+{
+	static SharedMemory shm("bhome_default_shm_v0", 1024*1024*64);
+	return shm;
+}
+
diff --git a/src/center.h b/src/center.h
new file mode 100644
index 0000000..fcb8005
--- /dev/null
+++ b/src/center.h
@@ -0,0 +1,26 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  center.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�03鏈�30鏃� 16鏃�22鍒�24绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef CENTER_TM9OUQTG
+#define CENTER_TM9OUQTG
+
+class BHCenter
+{
+
+};
+
+#endif // end of include guard: CENTER_TM9OUQTG
diff --git a/src/defs.h b/src/defs.h
index 56c6c9c..acfe09e 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -27,6 +27,12 @@
 const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
 const int kBHCenterPort = 24287;
 const char kTopicSep = '.';
+namespace bhome_shm {
+class SharedMemory;
+}
+
+bhome_shm::SharedMemory &BHomeShm();
+
 //TODO center can check shm for previous crash.
 
 #endif // end of include guard: DEFS_KP8LKGD0
diff --git a/src/shm.h b/src/shm.h
index 5e2c8b9..0f68754 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -61,6 +61,7 @@
     template <class T, class ...Params> T * New(Params const&...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); }
     template <class T> void Delete(T *p) { if (p) { destroy_ptr<T>(p); }; }
     template <class T> void Delete(offset_ptr<T> p) { Delete(p.get()); }
+    template <class T> T *Find(const std::string &name) { return find<T>(name.c_str()).first; }
 
 };
 
@@ -91,7 +92,8 @@
             throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
         }
     }
-    Data *find(const std::string &name) { return shm_.find<Data>(ObjName(name).c_str()).first; }
+    static Data *Find(SharedMemory &shm, const std::string &name) { return shm.Find<Data>(ObjName(name)); }
+    Data *Find(const std::string &name) { return Find(shm_, ObjName(name)); }
     virtual ~ShmObject() {}
     std::string name() const { return name_; }
     Data* data() { return pdata_; }
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index ffc7c21..421cebf 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -28,7 +28,7 @@
 
 namespace {
 std::string MsgQIdToName(const MQId& id) { return "shmq" + to_string(id); }
-MQId EmptyId() { return nil_uuid(); }
+// MQId EmptyId() { return nil_uuid(); }
 MQId NewId() { return random_generator()(); }
 const int AdjustMQLength(const int len) {
     const int kMaxLength = 10000; 
@@ -59,12 +59,18 @@
     Remove();
 }
 
-bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
+bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
 {
-    Queue *remote = find(MsgQIdToName(remote_id));
+    Queue *remote = Find(shm, MsgQIdToName(remote_id));
     return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();});
 }
 
+// bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
+// {
+//     Queue *remote = Find(MsgQIdToName(remote_id));
+//     return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();});
+// }
+
 bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms)
 {
     MsgI msg;
diff --git a/src/shm_queue.h b/src/shm_queue.h
index a536553..60b1862 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -120,7 +120,10 @@
     bool Recv(BHMsg &msg, const int timeout_ms);
     bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
     bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms);
-    bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms);
+    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
+    bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) {
+        return Send(shm(), remote_id, msg, timeout_ms);
+    }
 };
 
 } // namespace bhome_shm
diff --git a/src/socket.cpp b/src/socket.cpp
new file mode 100644
index 0000000..1c28bfa
--- /dev/null
+++ b/src/socket.cpp
@@ -0,0 +1,134 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  socket.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�03鏈�30鏃� 15鏃�48鍒�58绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+
+#include "socket.h"
+#include <chrono>
+#include "msg.h"
+#include "defs.h"
+#include "bh_util.h"
+
+using namespace bhome_msg;
+using namespace bhome_shm;
+using namespace std::chrono_literals;
+
+namespace {
+
+int GetSocketDefaultLen(ShmSocket::Type type) 
+{
+    switch (type) {
+        case ShmSocket::eSockRequest : return 12;
+        case ShmSocket::eSockReply : return 64;
+        case ShmSocket::eSockPublish : return 0;
+        case ShmSocket::eSockSubscribe : return 64;
+        default: return 0;
+    }
+}
+
+
+}
+
+ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm)
+    : shm_(shm),
+      type_(type),
+      run_(false)
+{
+    int len = GetSocketDefaultLen(type);
+    if (len != 0) {
+        mq_.reset(new Queue(shm_, len));
+
+        auto RecvProc = [this](){
+            while (run_) {
+                try {
+                    std::unique_lock<std::mutex> lk(mutex_);
+                    if (cv_recv_cb_.wait_for(lk, 100ms, [this](){ return HasRecvCB(); })) {
+                        BHMsg msg;
+                        if (mq_->Recv(msg, 100)) {
+                            this->onRecv_(msg);
+                        }
+                    }
+                } catch (...) {}
+            }
+        };
+        run_.store(true);
+        workers_.emplace_back(RecvProc);
+    }
+}
+ShmSocket::ShmSocket(Type type)
+    : ShmSocket(type, BHomeShm())
+{
+}
+
+ShmSocket::~ShmSocket()
+{
+    Stop();
+}
+
+bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
+{
+    if (type_ != eSockPublish) {
+        return false;
+    }
+    assert(!mq_);
+    try {
+        MsgI imsg;
+        if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) {
+            return false;
+        }
+        DEFER1(imsg.Release(shm_));
+        return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms);
+        
+    } catch (...) {
+        return false;
+    }
+}
+
+bool ShmSocket::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
+{
+    if (type_ != eSockSubscribe) {
+        return false;
+    }
+    assert(mq_);
+    try {
+        return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms);
+    } catch (...) {
+        return false;
+    }
+}
+
+bool ShmSocket::SetRecvCallback(const RecvCB &onRecv)
+{
+    std::lock_guard<std::mutex> lock(mutex_);
+    onRecv_ = onRecv;
+    cv_recv_cb_.notify_one();
+    return true;
+}
+
+bool ShmSocket::HasRecvCB()
+{
+    return static_cast<bool>(onRecv_);
+}
+
+void ShmSocket::Stop()
+{
+    run_ = false;
+    for (auto &t : workers_) {
+        if (t.joinable()) {
+            t.join();
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/socket.h b/src/socket.h
new file mode 100644
index 0000000..e65ac83
--- /dev/null
+++ b/src/socket.h
@@ -0,0 +1,74 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  socket.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�03鏈�30鏃� 15鏃�49鍒�19绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+
+#ifndef SOCKET_GWTJHBPO
+#define SOCKET_GWTJHBPO
+
+#include "shm_queue.h"
+#include <vector>
+#include <thread>
+#include <memory>
+#include <functional>
+#include <mutex>
+#include <condition_variable>
+#include <atomic>
+
+class ShmSocket
+{
+    typedef bhome_shm::ShmMsgQueue Queue;
+public:
+    enum Type {
+        eSockRequest,
+        eSockReply,
+        eSockSubscribe,
+        eSockPublish,
+    };
+    typedef std::function<void (bhome_msg::BHMsg &msg)> RecvCB;
+
+    ShmSocket(Type type);
+    ShmSocket(Type type, bhome_shm::SharedMemory &shm);
+    ~ShmSocket();
+
+    // bool Request(const std::string &topic, const void *data, const size_t size, onReply);
+    bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv
+
+    // bool HandleRequest(onData);
+    bool ReadRequest(); // exclude with HandleRequest
+    bool SendReply();   // exclude with HandleRequest
+
+    bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
+    bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
+    bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
+    bool SetRecvCallback(const RecvCB &onRecv);
+private:
+    bool HasRecvCB();
+    void Stop();
+
+    bhome_shm::SharedMemory &shm_;
+    Type type_;
+    std::vector<std::thread> workers_;
+    std::mutex mutex_;
+    std::condition_variable cv_recv_cb_;
+    std::atomic<bool> run_;
+    RecvCB onRecv_;
+
+    std::unique_ptr<Queue> mq_;
+};
+
+
+#endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/utest/utest.cpp b/utest/utest.cpp
index f7571c8..bb5c14d 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -8,6 +8,7 @@
 #include "pubsub.h"
 #include "defs.h"
 #include "util.h"
+#include "socket.h"
 
 template <class A, class B> struct IsSameType { static const bool value = false; };
 template <class A> struct IsSameType<A,A> { static const bool value = true; };
@@ -73,45 +74,48 @@
     std::atomic<uint64_t> last_count(0);
 
     const uint64_t nmsg = 100;
-
     const int timeout = 1000;
     auto Sub = [&](int id, const std::vector<std::string> &topics) {
-        ShmMsgQueue client(shm, 8);
-        client.Send(kBHBusQueueId, MakeSub(client.Id(), topics), timeout);
-        for (int i = 0; i < nmsg * topics.size(); ++i) {
-            BHMsg msg;
-            if (client.Recv(msg, 1000)) {
-                if (msg.type() != kMsgTypePublish) {
-                    BOOST_CHECK(false);
-                }
-                DataPub pub;
-                if (!pub.ParseFromString(msg.body())) {
-                    BOOST_CHECK(false);
-                }
-                ++count;
-                auto cur = Now();
-                if (last_time.exchange(cur) < cur) {
-                    std::cout << "time: " << cur;
-                    printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
-                           count.load(), count - last_count.exchange(count), init_avail - Avail());
-                }
-                // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
-            } else {
-                printf("sub %2d recv timeout\n", id);
-            }
+        ShmSocket client(ShmSocket::eSockSubscribe, shm);
+        bool r = client.Subscribe(topics, timeout);
+        std::mutex mutex;
+        std::condition_variable cv;
 
-        }
+        int i = 0;
+        auto OnRecv = [&](BHMsg &msg) {
+            if (msg.type() != kMsgTypePublish) {
+                BOOST_CHECK(false);
+            }
+            DataPub pub;
+            if (!pub.ParseFromString(msg.body())) {
+                BOOST_CHECK(false);
+            }
+            ++count;
+
+            auto cur = Now();
+            if (last_time.exchange(cur) < cur) {
+                std::cout << "time: " << cur;
+                printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
+                       count.load(), count - last_count.exchange(count), init_avail - Avail());
+            }
+            if (++i >= nmsg*topics.size()) {
+                cv.notify_one();
+            }
+            // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
+        };
+        client.SetRecvCallback(OnRecv);
+
+        std::unique_lock<std::mutex> lk(mutex);
+        cv.wait(lk);
+
     };
+
     auto Pub = [&](const std::string &topic) {
-        ShmMsgQueue provider(shm, 0);
+        ShmSocket provider(ShmSocket::eSockPublish, shm);
         for (int i = 0; i < nmsg; ++i) {
             std::string data = topic + std::to_string(i) + std::string(1000, '-');
 
-            MsgI msg;
-            msg.MakeRC(shm, MakePub(topic, data.data(), data.size()));
-            DEFER1(msg.Release(shm));
-            bool r = provider.Send(kBHBusQueueId, msg, timeout);
-
+            bool r = provider.Publish(topic, data.data(), data.size(), timeout);
             // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
             if (!r) {
                 printf("pub ret: %s\n", r ? "ok" : "fail");

--
Gitblit v1.8.0