From 1ff714838c03cba1a18884d5b48a20ee6c4275ac Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 15:00:53 +0800
Subject: [PATCH] class MsgI, ShmMsgQueue, no bind to shm.

---
 src/sendq.h |   48 ++++++++++++++++++++++++++++++++++++++++--------
 1 files changed, 40 insertions(+), 8 deletions(-)

diff --git a/src/sendq.h b/src/sendq.h
index 759e12a..ec63b05 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -46,11 +46,13 @@
 	typedef TimedData<MsgInfo> TimedMsg;
 	typedef TimedMsg::TimePoint TimePoint;
 	typedef TimedMsg::Duration Duration;
+	SendQ(SharedMemory &shm) :
+	    shm_(shm) {}
 
 	bool Append(const MQInfo &mq, MsgI msg)
 	{
 		msg.AddRef();
-		auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); };
+		auto onMsgExpire = [msg](const Data &d) mutable { msg.Release(); };
 		try {
 			AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
 			return true;
@@ -63,9 +65,9 @@
 	bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire)
 	{
 		msg.AddRef();
-		auto onMsgExpire = [onExpire](const Data &d) {
+		auto onMsgExpire = [onExpire, msg](const Data &d) mutable {
 			onExpire(d);
-			MsgI(d).Release();
+			msg.Release();
 		};
 		try {
 			AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
@@ -85,7 +87,7 @@
 			return false;
 		}
 	}
-	bool TrySend(ShmMsgQueue &mq);
+	bool TrySend();
 
 private:
 	static TimePoint Now() { return TimedMsg::Clock::now(); }
@@ -96,18 +98,48 @@
 	typedef std::list<Array> ArrayList;
 	typedef std::unordered_map<Remote, ArrayList> Store;
 
-	int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr);
-	int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
+	int DoSend1Remote(const Remote remote, Array &arr);
+	int DoSend1Remote(const Remote remote, ArrayList &arr);
 
 	bool TooFast();
 
+	SharedMemory &shm_;
 	std::mutex mutex_in_;
 	std::mutex mutex_out_;
 	Store in_;
 	Store out_;
 
-	int64_t count_ = 0;
-	int64_t last_time_ = 0;
+	struct Counter {
+		std::atomic<int64_t> count_;
+		std::atomic<int64_t> count_1sec_;
+		std::atomic<int64_t> last_time_;
+		Counter() :
+		    count_(0), count_1sec_(0), last_time_(0) {}
+		void Count1()
+		{
+			CheckTime();
+			++count_1sec_;
+			++count_;
+		}
+		void Count(int n)
+		{
+			CheckTime();
+			count_1sec_ += n;
+			count_ += n;
+		}
+		void CheckTime()
+		{
+			auto cur = NowSec();
+			if (cur > last_time_) {
+				count_1sec_ = 0;
+				last_time_ = cur;
+			}
+		}
+		int64_t GetCount() const { return count_.load(); }
+		int64_t LastSec() const { return count_1sec_.load(); }
+	};
+	Counter count_in_;
+	Counter count_out_;
 };
 
 #endif // end of include guard: SENDQ_IWKMSK7M

--
Gitblit v1.8.0