From c095be83aa9d795023a54ddaccfb6717258561c9 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 29 三月 2021 11:37:20 +0800
Subject: [PATCH] read/write multiple msgs; move mqid.
---
src/shm_queue.h | 75 ++++++++++++++++++++++++-------------
src/shm_queue.cpp | 5 +-
2 files changed, 52 insertions(+), 28 deletions(-)
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index de4505b..401f346 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -45,7 +45,8 @@
// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len):
-SharedQueue(segment, MsgQIdToName(id), id, AdjustMQLength(len), segment.get_segment_manager())
+Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()),
+id_(id)
{
}
@@ -60,7 +61,7 @@
bool ShmMsgQueue::Send(const MQId &remote_id, const Msg &msg, const int timeout_ms)
{
Queue *remote = find(MsgQIdToName(remote_id));
- return remote && remote->Write(msg, timeout_ms, [&](){msg.AddRef();});
+ return remote && remote->Write(msg, timeout_ms, [](const Msg&msg){msg.AddRef();});
}
bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms)
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 140654c..1a4a57d 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -31,14 +31,13 @@
typedef boost::uuids::uuid MQId;
template <class D>
-class SyncedQueue : private Circular<D>
+class SharedQueue : private Circular<D>
{
typedef Circular<D> Super;
Mutex mutex_;
Cond cond_read_;
Cond cond_write_;
Mutex & mutex() { return mutex_; }
- const MQId id_;
static boost::posix_time::ptime MSFromNow(const int ms)
{
@@ -48,52 +47,76 @@
}
public:
- // template <class...T> SyncedQueue(const MQId &id, T&&...t):Super(t...), id_(id) {}
- SyncedQueue(const MQId &id, const uint32_t len, Allocator<D> const& alloc):Super(len, alloc), id_(id) {}
+ SharedQueue(const uint32_t len, Allocator<D> const& alloc):Super(len, alloc) {}
using Super::size;
using Super::capacity;
- const MQId &Id() const { return id_; }
+ template <class Iter, class OnWrite>
+ int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite) {
+ int n = 0;
+ if (begin != end) {
+ auto endtime = MSFromNow(timeout_ms);
+ Guard lock(mutex());
+ while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) {
+ onWrite(*begin);
+ this->push_back(*begin);
+ ++n;
+ cond_read_.notify_one();
+ if (++begin == end) {
+ break;
+ }
+ }
+ }
+ return n;
+ }
template <class OnWrite>
bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) {
- Guard lock(mutex());
- if (cond_write_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->full(); })) {
- onWrite();
- this->push_back(buf);
- cond_read_.notify_one();
- return true;
- } else {
- return false;
- }
+ return Write(&buf, (&buf)+1, timeout_ms, onWrite);
}
- bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](){}); }
+ bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](const D &buf){}); }
- bool Read(D &buf, const int timeout_ms){
+ template <class OnData>
+ bool Read(const int timeout_ms, OnData onData){
+ int n = 0;
+ auto endtime = MSFromNow(timeout_ms);
Guard lock(mutex());
- if (cond_read_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->empty(); })) {
- using std::swap;
- swap(buf, this->front());
+ while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) {
+ const bool more = onData(this->front());
this->pop_front();
cond_write_.notify_one();
- return true;
- } else {
- return false;
+ ++n;
+ if (!more) {
+ break;
+ }
}
+ return n;
+ }
+ bool Read(D &buf, const int timeout_ms){
+ auto read1 = [&](D &d) {
+ using std::swap;
+ swap(buf, d);
+ return false;
+ };
+ return Read(timeout_ms, read1) == 1;
}
};
-class ShmMsgQueue : private ShmObject<SyncedQueue<Msg> >
+
+class ShmMsgQueue : private ShmObject<SharedQueue<Msg> >
{
- typedef ShmObject<SyncedQueue<Msg> > SharedQueue;
- typedef SharedQueue::Data Queue;
+ typedef ShmObject<SharedQueue<Msg> > Super;
+ typedef Super::Data Queue;
bool Write(const Msg &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
bool Read(Msg &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
+ MQId id_;
+protected:
+ ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use.
public:
ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
ShmMsgQueue(ShmType &segment, const int len);
~ShmMsgQueue();
bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms);
bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms);
- const MQId &Id() const { return data()->Id(); }
+ const MQId &Id() const { return id_; }
bool Send(const MQId &remote_id, const Msg &msg, const int timeout_ms);
bool Recv(Msg &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
};
--
Gitblit v1.8.0