From 1ff714838c03cba1a18884d5b48a20ee6c4275ac Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 21 五月 2021 15:00:53 +0800 Subject: [PATCH] class MsgI, ShmMsgQueue, no bind to shm. --- src/sendq.h | 48 ++++++++++++++++++++++++++++++++++++++++-------- 1 files changed, 40 insertions(+), 8 deletions(-) diff --git a/src/sendq.h b/src/sendq.h index 759e12a..ec63b05 100644 --- a/src/sendq.h +++ b/src/sendq.h @@ -46,11 +46,13 @@ typedef TimedData<MsgInfo> TimedMsg; typedef TimedMsg::TimePoint TimePoint; typedef TimedMsg::Duration Duration; + SendQ(SharedMemory &shm) : + shm_(shm) {} bool Append(const MQInfo &mq, MsgI msg) { msg.AddRef(); - auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); }; + auto onMsgExpire = [msg](const Data &d) mutable { msg.Release(); }; try { AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); return true; @@ -63,9 +65,9 @@ bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire) { msg.AddRef(); - auto onMsgExpire = [onExpire](const Data &d) { + auto onMsgExpire = [onExpire, msg](const Data &d) mutable { onExpire(d); - MsgI(d).Release(); + msg.Release(); }; try { AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); @@ -85,7 +87,7 @@ return false; } } - bool TrySend(ShmMsgQueue &mq); + bool TrySend(); private: static TimePoint Now() { return TimedMsg::Clock::now(); } @@ -96,18 +98,48 @@ typedef std::list<Array> ArrayList; typedef std::unordered_map<Remote, ArrayList> Store; - int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr); - int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr); + int DoSend1Remote(const Remote remote, Array &arr); + int DoSend1Remote(const Remote remote, ArrayList &arr); bool TooFast(); + SharedMemory &shm_; std::mutex mutex_in_; std::mutex mutex_out_; Store in_; Store out_; - int64_t count_ = 0; - int64_t last_time_ = 0; + struct Counter { + std::atomic<int64_t> count_; + std::atomic<int64_t> count_1sec_; + std::atomic<int64_t> last_time_; + Counter() : + count_(0), count_1sec_(0), last_time_(0) {} + void Count1() + { + CheckTime(); + ++count_1sec_; + ++count_; + } + void Count(int n) + { + CheckTime(); + count_1sec_ += n; + count_ += n; + } + void CheckTime() + { + auto cur = NowSec(); + if (cur > last_time_) { + count_1sec_ = 0; + last_time_ = cur; + } + } + int64_t GetCount() const { return count_.load(); } + int64_t LastSec() const { return count_1sec_.load(); } + }; + Counter count_in_; + Counter count_out_; }; #endif // end of include guard: SENDQ_IWKMSK7M -- Gitblit v1.8.0