From c64c54d8e75b9354dc49a7b6b2d326e7dd59eb37 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 15 四月 2021 19:32:16 +0800
Subject: [PATCH] add api; fix send, socknode mem leak.

---
 src/socket.h |   18 ++++++++++++++----
 1 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/src/socket.h b/src/socket.h
index 9b47e42..96af6e7 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -36,9 +36,11 @@
 
 class ShmSocket : private boost::noncopyable
 {
-	bool SendImpl(const void *valid_remote, const MsgI &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
+	bool SendImpl(const void *valid_remote, MsgI const &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
 	{
-		send_buffer_->Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
+		// if (!mq().TrySend(*(MQId *) valid_remote, imsg)) {
+		send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
+		// }
 		return true;
 	}
 
@@ -67,7 +69,11 @@
 	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body)
 	{
 		MsgI msg;
-		return msg.Make(shm(), head, body) && SendImpl(valid_remote, msg);
+		if (msg.Make(shm(), head, body)) {
+			DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
+			return SendImpl(valid_remote, msg);
+		}
+		return false;
 	}
 
 	template <class Body>
@@ -76,6 +82,7 @@
 		//TODO send_buffer_ need flag, and remove callback on expire.
 		MsgI msg;
 		if (msg.Make(shm(), head, body)) {
+			DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
 			std::string msg_id(head.msg_id());
 			per_msg_cbs_->Add(msg_id, cb);
 			auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
@@ -83,6 +90,8 @@
 				per_msg_cbs_->Find(msg_id, cb_no_use);
 			};
 			return SendImpl(valid_remote, msg, onExpireRemoveCB);
+		} else {
+			printf("out of mem?, avail: %ld\n", shm().get_free_memory());
 		}
 		return false;
 	}
@@ -170,7 +179,8 @@
 	};
 
 	Synced<AsyncCBs> per_msg_cbs_;
-	Synced<SendQ> send_buffer_;
+	SendQ send_buffer_;
+	// Synced<SendQ> send_buffer_;
 };
 
 #endif // end of include guard: SOCKET_GWTJHBPO

--
Gitblit v1.8.0