From 8cbb55f3066f71f5a4328193414d4555c87e96be Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 30 三月 2021 11:41:30 +0800
Subject: [PATCH] refactor.

---
 src/shm_queue.h |   89 +++++++++++++++++++++++++++++---------------
 1 files changed, 59 insertions(+), 30 deletions(-)

diff --git a/src/shm_queue.h b/src/shm_queue.h
index d0eb972..9d08016 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -31,14 +31,13 @@
 typedef boost::uuids::uuid MQId;
 
 template <class D>
-class SyncedQueue : private Circular<D>
+class SharedQueue : private Circular<D>
 {
     typedef Circular<D> Super;
     Mutex mutex_;
     Cond cond_read_;
     Cond cond_write_;
     Mutex & mutex() { return mutex_; }
-    const MQId id_;
 
     static boost::posix_time::ptime MSFromNow(const int ms)
     {
@@ -48,49 +47,79 @@
     }
 
 public:
-    // template <class...T> SyncedQueue(const MQId &id, T&&...t):Super(t...), id_(id) {}
-    SyncedQueue(const MQId &id, const uint32_t len, Allocator<D> const& alloc):Super(len, alloc), id_(id) {}
+    SharedQueue(const uint32_t len, Allocator<D> const& alloc):Super(len, alloc) {}
     using Super::size;
     using Super::capacity;
-    const MQId &Id() const { return id_; }
-    bool Write(D buf, const int timeout_ms) {
-        Guard lock(mutex());
-        if (cond_write_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->full(); })) {
-            this->push_back(buf);
-            cond_read_.notify_one();
-            return true;
-        } else {
-            return false;
+    template <class Iter, class OnWrite>
+    int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite) {
+        int n = 0;
+        if (begin != end) {
+            auto endtime = MSFromNow(timeout_ms);
+            Guard lock(mutex());
+            while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) {
+                onWrite(*begin);
+                this->push_back(*begin);
+                ++n;
+                cond_read_.notify_one();
+                if (++begin == end) {
+                    break;
+                }
+            }
         }
+        return n;
     }
+    template <class OnWrite>
+    bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) {
+        return Write(&buf, (&buf)+1, timeout_ms, onWrite);
+    }
+    bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](const D &buf){}); }
 
-    bool Read(D &buf, const int timeout_ms){
+    template <class OnData>
+    bool Read(const int timeout_ms, OnData onData){
+        int n = 0;
+        auto endtime = MSFromNow(timeout_ms);
         Guard lock(mutex());
-        if (cond_read_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->empty(); })) {
-            buf = this->front();
+        while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) {
+            const bool more = onData(this->front());
             this->pop_front();
             cond_write_.notify_one();
-            return true;
-        } else {
-            return false;
+            ++n;
+            if (!more) {
+                break;
+            }
         }
+        return n;
+    }
+    bool Read(D &buf, const int timeout_ms){
+        auto read1 = [&](D &d) { 
+            using std::swap;
+            swap(buf, d);
+            return false;
+        };
+        return Read(timeout_ms, read1) == 1;
     }
 };
 
-class ShmMsgQueue : private ShmObject<SyncedQueue<Msg> >
+using namespace bhome_msg;
+
+class ShmMsgQueue : private ShmObject<SharedQueue<MsgI> >
 {
-    typedef ShmObject<SyncedQueue<Msg> > SharedQueue;
-    typedef SharedQueue::Data Queue;
-    bool Write(const Msg &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
-    bool Read(Msg &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
+    typedef ShmObject<SharedQueue<MsgI> > Super;
+    typedef Super::Data Queue;
+    bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
+    bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
+    MQId id_;
+protected:
+    ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use.
 public:
-    ShmMsgQueue(const MQId &id, ShmType &segment, const uint32_t len);
-    ShmMsgQueue(ShmType &segment, const uint32_t len);
+    ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
+    ShmMsgQueue(ShmType &segment, const int len);
     ~ShmMsgQueue();
-    bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms);
-    bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms);
-    const MQId &Id() const { return data()->Id(); }
-    bool Send(const MQId &remote_id, const Msg &msg, const int timeout_ms);
+    // bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms); // request
+    bool Recv(BHMsg &msg, const int timeout_ms);
+    bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms);
+    const MQId &Id() const { return id_; }
+    bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms);
 };
 
 } // namespace bhome_shm

--
Gitblit v1.8.0