From c095be83aa9d795023a54ddaccfb6717258561c9 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 29 三月 2021 11:37:20 +0800
Subject: [PATCH] read/write multiple msgs; move mqid.

---
 src/shm_queue.h |   75 ++++++++++++++++++++++++-------------
 1 files changed, 49 insertions(+), 26 deletions(-)

diff --git a/src/shm_queue.h b/src/shm_queue.h
index 140654c..1a4a57d 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -31,14 +31,13 @@
 typedef boost::uuids::uuid MQId;
 
 template <class D>
-class SyncedQueue : private Circular<D>
+class SharedQueue : private Circular<D>
 {
     typedef Circular<D> Super;
     Mutex mutex_;
     Cond cond_read_;
     Cond cond_write_;
     Mutex & mutex() { return mutex_; }
-    const MQId id_;
 
     static boost::posix_time::ptime MSFromNow(const int ms)
     {
@@ -48,52 +47,76 @@
     }
 
 public:
-    // template <class...T> SyncedQueue(const MQId &id, T&&...t):Super(t...), id_(id) {}
-    SyncedQueue(const MQId &id, const uint32_t len, Allocator<D> const& alloc):Super(len, alloc), id_(id) {}
+    SharedQueue(const uint32_t len, Allocator<D> const& alloc):Super(len, alloc) {}
     using Super::size;
     using Super::capacity;
-    const MQId &Id() const { return id_; }
+    template <class Iter, class OnWrite>
+    int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite) {
+        int n = 0;
+        if (begin != end) {
+            auto endtime = MSFromNow(timeout_ms);
+            Guard lock(mutex());
+            while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) {
+                onWrite(*begin);
+                this->push_back(*begin);
+                ++n;
+                cond_read_.notify_one();
+                if (++begin == end) {
+                    break;
+                }
+            }
+        }
+        return n;
+    }
     template <class OnWrite>
     bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) {
-        Guard lock(mutex());
-        if (cond_write_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->full(); })) {
-            onWrite();
-            this->push_back(buf);
-            cond_read_.notify_one();
-            return true;
-        } else {
-            return false;
-        }
+        return Write(&buf, (&buf)+1, timeout_ms, onWrite);
     }
-    bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](){}); }
+    bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](const D &buf){}); }
 
-    bool Read(D &buf, const int timeout_ms){
+    template <class OnData>
+    bool Read(const int timeout_ms, OnData onData){
+        int n = 0;
+        auto endtime = MSFromNow(timeout_ms);
         Guard lock(mutex());
-        if (cond_read_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->empty(); })) {
-            using std::swap;
-            swap(buf, this->front());
+        while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) {
+            const bool more = onData(this->front());
             this->pop_front();
             cond_write_.notify_one();
-            return true;
-        } else {
-            return false;
+            ++n;
+            if (!more) {
+                break;
+            }
         }
+        return n;
+    }
+    bool Read(D &buf, const int timeout_ms){
+        auto read1 = [&](D &d) { 
+            using std::swap;
+            swap(buf, d);
+            return false;
+        };
+        return Read(timeout_ms, read1) == 1;
     }
 };
 
-class ShmMsgQueue : private ShmObject<SyncedQueue<Msg> >
+
+class ShmMsgQueue : private ShmObject<SharedQueue<Msg> >
 {
-    typedef ShmObject<SyncedQueue<Msg> > SharedQueue;
-    typedef SharedQueue::Data Queue;
+    typedef ShmObject<SharedQueue<Msg> > Super;
+    typedef Super::Data Queue;
     bool Write(const Msg &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
     bool Read(Msg &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
+    MQId id_;
+protected:
+    ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use.
 public:
     ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
     ShmMsgQueue(ShmType &segment, const int len);
     ~ShmMsgQueue();
     bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms);
     bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms);
-    const MQId &Id() const { return data()->Id(); }
+    const MQId &Id() const { return id_; }
     bool Send(const MQId &remote_id, const Msg &msg, const int timeout_ms);
     bool Recv(Msg &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
 };

--
Gitblit v1.8.0