From 628c1c21ffb19d8c96ed9ce89531595f9870ab1a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 23 四月 2021 18:41:02 +0800
Subject: [PATCH] add msg tag; recv all msgs before remove mq.
---
src/msg.h | 9 ++--
src/shm_queue.h | 26 ++----------
src/shm_queue.cpp | 27 +++++--------
src/msg.cpp | 1
4 files changed, 20 insertions(+), 43 deletions(-)
diff --git a/src/msg.cpp b/src/msg.cpp
index 7ab0434..f180d67 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -20,6 +20,5 @@
namespace bhome_msg
{
-const uint32_t kMsgTag = 0xf1e2d3c4;
} // namespace bhome_msg
diff --git a/src/msg.h b/src/msg.h
index 99b3a09..6ce4902 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -70,9 +70,11 @@
return pshm;
}
- struct Meta {
+ static const uint32_t kMsgTag = 0xf1e2d3c4;
+ typedef struct {
RefCount count_;
- };
+ const uint32_t tag_ = kMsgTag;
+ } Meta;
Offset offset_;
void *Alloc(const size_t size)
{
@@ -155,9 +157,8 @@
explicit ShmMsg(const size_t size) :
ShmMsg(Alloc(size)) {}
void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
- bool valid() const { return static_cast<bool>(offset_); }
+ bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; }
- // AddRef and Release works for both counted and not counted msg.
int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
int Release()
{
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index df9ce1f..215a8ac 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -61,31 +61,24 @@
bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId &id)
{
+ Queue *q = Find(shm, id);
+ if (q) {
+ MsgI msg;
+ while (q->TryRead(msg)) {
+ msg.Release();
+ }
+ }
return Super::Remove(shm, MsgQIdToName(id));
}
-ShmMsgQueue::Queue *ShmMsgQueue::FindRemote(SharedMemory &shm, const MQId &remote_id)
+ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId &remote_id)
{
- return Find(shm, MsgQIdToName(remote_id));
-}
-bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
-{
- Queue *remote = FindRemote(shm, remote_id);
- if (remote) {
- if (onsend) {
- return remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
- } else {
- return remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
- }
- } else {
- // SetLestError(eNotFound);
- return false;
- }
+ return Super::Find(shm, MsgQIdToName(remote_id));
}
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend)
{
- Queue *remote = FindRemote(shm, remote_id);
+ Queue *remote = Find(shm, remote_id);
if (remote) {
if (onsend) {
return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 4f544c8..93d77df 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -110,22 +110,6 @@
Super(len, alloc) {}
using Super::capacity;
using Super::size;
- template <class Iter, class OnWrite>
- int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite)
- {
- auto endtime = MSFromNow(timeout_ms);
- auto timedWritePred = [this, endtime](Guard &lock) {
- return (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); }));
- };
- return WriteAllOnCond(begin, end, timedWritePred, onWrite);
- }
-
- template <class OnWrite>
- bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) { return Write(&buf, (&buf) + 1, timeout_ms, onWrite); }
- bool Write(const D &buf, const int timeout_ms)
- {
- return Write(buf, timeout_ms, [](const D &buf) {});
- }
template <class Iter, class OnWrite>
int TryWrite(Iter begin, Iter end, const OnWrite &onWrite)
@@ -174,13 +158,13 @@
bool TryRecv(MsgI &msg) { return data()->TryRead(msg); }
template <class OnData>
int TryRecvAll(OnData const &onData) { return data()->TryReadAll(onData); }
- static Queue *FindRemote(SharedMemory &shm, const MQId &remote_id);
- static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend());
+ static Queue *Find(SharedMemory &shm, const MQId &remote_id);
+ // static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend());
static bool TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
template <class Iter>
static int TrySendAll(SharedMemory &shm, const MQId &remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
{
- Queue *remote = FindRemote(shm, remote_id);
+ Queue *remote = Find(shm, remote_id);
if (remote) {
if (onsend) {
return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
@@ -193,8 +177,8 @@
}
}
- template <class... Rest>
- bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
+ // template <class... Rest>
+ // bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
template <class... Rest>
bool TrySend(const MQId &remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
template <class... Rest>
--
Gitblit v1.8.0