From 628c1c21ffb19d8c96ed9ce89531595f9870ab1a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 23 四月 2021 18:41:02 +0800
Subject: [PATCH] add msg tag; recv all msgs before remove mq.

---
 src/msg.h         |    9 ++--
 src/shm_queue.h   |   26 ++----------
 src/shm_queue.cpp |   27 +++++--------
 src/msg.cpp       |    1 
 4 files changed, 20 insertions(+), 43 deletions(-)

diff --git a/src/msg.cpp b/src/msg.cpp
index 7ab0434..f180d67 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -20,6 +20,5 @@
 
 namespace bhome_msg
 {
-const uint32_t kMsgTag = 0xf1e2d3c4;
 
 } // namespace bhome_msg
diff --git a/src/msg.h b/src/msg.h
index 99b3a09..6ce4902 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -70,9 +70,11 @@
 		return pshm;
 	}
 
-	struct Meta {
+	static const uint32_t kMsgTag = 0xf1e2d3c4;
+	typedef struct {
 		RefCount count_;
-	};
+		const uint32_t tag_ = kMsgTag;
+	} Meta;
 	Offset offset_;
 	void *Alloc(const size_t size)
 	{
@@ -155,9 +157,8 @@
 	explicit ShmMsg(const size_t size) :
 	    ShmMsg(Alloc(size)) {}
 	void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
-	bool valid() const { return static_cast<bool>(offset_); }
+	bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; }
 
-	// AddRef and Release works for both counted and not counted msg.
 	int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
 	int Release()
 	{
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index df9ce1f..215a8ac 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -61,31 +61,24 @@
 
 bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId &id)
 {
+	Queue *q = Find(shm, id);
+	if (q) {
+		MsgI msg;
+		while (q->TryRead(msg)) {
+			msg.Release();
+		}
+	}
 	return Super::Remove(shm, MsgQIdToName(id));
 }
 
-ShmMsgQueue::Queue *ShmMsgQueue::FindRemote(SharedMemory &shm, const MQId &remote_id)
+ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId &remote_id)
 {
-	return Find(shm, MsgQIdToName(remote_id));
-}
-bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
-{
-	Queue *remote = FindRemote(shm, remote_id);
-	if (remote) {
-		if (onsend) {
-			return remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
-		} else {
-			return remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
-		}
-	} else {
-		// SetLestError(eNotFound);
-		return false;
-	}
+	return Super::Find(shm, MsgQIdToName(remote_id));
 }
 
 bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend)
 {
-	Queue *remote = FindRemote(shm, remote_id);
+	Queue *remote = Find(shm, remote_id);
 	if (remote) {
 		if (onsend) {
 			return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 4f544c8..93d77df 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -110,22 +110,6 @@
 	    Super(len, alloc) {}
 	using Super::capacity;
 	using Super::size;
-	template <class Iter, class OnWrite>
-	int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite)
-	{
-		auto endtime = MSFromNow(timeout_ms);
-		auto timedWritePred = [this, endtime](Guard &lock) {
-			return (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); }));
-		};
-		return WriteAllOnCond(begin, end, timedWritePred, onWrite);
-	}
-
-	template <class OnWrite>
-	bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) { return Write(&buf, (&buf) + 1, timeout_ms, onWrite); }
-	bool Write(const D &buf, const int timeout_ms)
-	{
-		return Write(buf, timeout_ms, [](const D &buf) {});
-	}
 
 	template <class Iter, class OnWrite>
 	int TryWrite(Iter begin, Iter end, const OnWrite &onWrite)
@@ -174,13 +158,13 @@
 	bool TryRecv(MsgI &msg) { return data()->TryRead(msg); }
 	template <class OnData>
 	int TryRecvAll(OnData const &onData) { return data()->TryReadAll(onData); }
-	static Queue *FindRemote(SharedMemory &shm, const MQId &remote_id);
-	static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend());
+	static Queue *Find(SharedMemory &shm, const MQId &remote_id);
+	// static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend());
 	static bool TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
 	template <class Iter>
 	static int TrySendAll(SharedMemory &shm, const MQId &remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
 	{
-		Queue *remote = FindRemote(shm, remote_id);
+		Queue *remote = Find(shm, remote_id);
 		if (remote) {
 			if (onsend) {
 				return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
@@ -193,8 +177,8 @@
 		}
 	}
 
-	template <class... Rest>
-	bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
+	// template <class... Rest>
+	// bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
 	template <class... Rest>
 	bool TrySend(const MQId &remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
 	template <class... Rest>

--
Gitblit v1.8.0