From c6964d5af25d4ec7ed9dbe7674dc4e3896b36ead Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 16 四月 2021 16:10:02 +0800
Subject: [PATCH] node remove mq if never registered; refactor.

---
 src/socket.h |   39 +++++++++++++++++++++------------------
 1 files changed, 21 insertions(+), 18 deletions(-)

diff --git a/src/socket.h b/src/socket.h
index 9b47e42..dbb161c 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -36,9 +36,11 @@
 
 class ShmSocket : private boost::noncopyable
 {
-	bool SendImpl(const void *valid_remote, const MsgI &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
+	bool SendImpl(const void *valid_remote, MsgI const &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
 	{
-		send_buffer_->Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
+		// if (!mq().TrySend(*(MQId *) valid_remote, imsg)) {
+		send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
+		// }
 		return true;
 	}
 
@@ -55,6 +57,7 @@
 	ShmSocket(Shm &shm, const int len = 12);
 	~ShmSocket();
 	static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); }
+	bool Remove() { return Remove(shm(), id()); }
 	const MQId &id() const { return mq().Id(); }
 	// start recv.
 	bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
@@ -64,25 +67,24 @@
 	size_t Pending() const { return mq().Pending(); }
 
 	template <class Body>
-	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body)
+	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB())
 	{
-		MsgI msg;
-		return msg.Make(shm(), head, body) && SendImpl(valid_remote, msg);
-	}
-
-	template <class Body>
-	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb)
-	{
-		//TODO send_buffer_ need flag, and remove callback on expire.
 		MsgI msg;
 		if (msg.Make(shm(), head, body)) {
+			DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
 			std::string msg_id(head.msg_id());
-			per_msg_cbs_->Add(msg_id, cb);
-			auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
-				RecvCB cb_no_use;
-				per_msg_cbs_->Find(msg_id, cb_no_use);
-			};
-			return SendImpl(valid_remote, msg, onExpireRemoveCB);
+			if (!cb) {
+				return SendImpl(valid_remote, msg);
+			} else {
+				per_msg_cbs_->Add(msg_id, cb);
+				auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
+					RecvCB cb_no_use;
+					per_msg_cbs_->Find(msg_id, cb_no_use);
+				};
+				return SendImpl(valid_remote, msg, onExpireRemoveCB);
+			}
+		} else {
+			SetLastError(ENOMEM, "Out of mem");
 		}
 		return false;
 	}
@@ -170,7 +172,8 @@
 	};
 
 	Synced<AsyncCBs> per_msg_cbs_;
-	Synced<SendQ> send_buffer_;
+	SendQ send_buffer_;
+	// Synced<SendQ> send_buffer_;
 };
 
 #endif // end of include guard: SOCKET_GWTJHBPO

--
Gitblit v1.8.0