From 330f78f3334bcdcdb4cc2ab2dbf66604e0224d71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 16:21:45 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.5.5:10010/r/valib/bhshmq
---
src/sendq.h | 50 +++++++++++++++++++++++++++++++++++++++++---------
1 files changed, 41 insertions(+), 9 deletions(-)
diff --git a/src/sendq.h b/src/sendq.h
index d1ba30a..ec63b05 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -46,11 +46,13 @@
typedef TimedData<MsgInfo> TimedMsg;
typedef TimedMsg::TimePoint TimePoint;
typedef TimedMsg::Duration Duration;
+ SendQ(SharedMemory &shm) :
+ shm_(shm) {}
bool Append(const MQInfo &mq, MsgI msg)
{
msg.AddRef();
- auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); };
+ auto onMsgExpire = [msg](const Data &d) mutable { msg.Release(); };
try {
AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
return true;
@@ -63,9 +65,9 @@
bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire)
{
msg.AddRef();
- auto onMsgExpire = [onExpire](const Data &d) {
+ auto onMsgExpire = [onExpire, msg](const Data &d) mutable {
onExpire(d);
- MsgI(d).Release();
+ msg.Release();
};
try {
AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
@@ -85,29 +87,59 @@
return false;
}
}
- bool TrySend(ShmMsgQueue &mq);
+ bool TrySend();
private:
static TimePoint Now() { return TimedMsg::Clock::now(); }
- static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
+ static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(NodeTimeoutSec()); }
void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire);
typedef std::deque<TimedMsg> Array;
typedef std::list<Array> ArrayList;
typedef std::unordered_map<Remote, ArrayList> Store;
- int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr);
- int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
+ int DoSend1Remote(const Remote remote, Array &arr);
+ int DoSend1Remote(const Remote remote, ArrayList &arr);
bool TooFast();
+ SharedMemory &shm_;
std::mutex mutex_in_;
std::mutex mutex_out_;
Store in_;
Store out_;
- int64_t count_ = 0;
- int64_t last_time_ = 0;
+ struct Counter {
+ std::atomic<int64_t> count_;
+ std::atomic<int64_t> count_1sec_;
+ std::atomic<int64_t> last_time_;
+ Counter() :
+ count_(0), count_1sec_(0), last_time_(0) {}
+ void Count1()
+ {
+ CheckTime();
+ ++count_1sec_;
+ ++count_;
+ }
+ void Count(int n)
+ {
+ CheckTime();
+ count_1sec_ += n;
+ count_ += n;
+ }
+ void CheckTime()
+ {
+ auto cur = NowSec();
+ if (cur > last_time_) {
+ count_1sec_ = 0;
+ last_time_ = cur;
+ }
+ }
+ int64_t GetCount() const { return count_.load(); }
+ int64_t LastSec() const { return count_1sec_.load(); }
+ };
+ Counter count_in_;
+ Counter count_out_;
};
#endif // end of include guard: SENDQ_IWKMSK7M
--
Gitblit v1.8.0