From 1ff714838c03cba1a18884d5b48a20ee6c4275ac Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 21 五月 2021 15:00:53 +0800 Subject: [PATCH] class MsgI, ShmMsgQueue, no bind to shm. --- src/sendq.cpp | 28 +++++++++------------------- 1 files changed, 9 insertions(+), 19 deletions(-) diff --git a/src/sendq.cpp b/src/sendq.cpp index f1e5918..2a772b0 100644 --- a/src/sendq.cpp +++ b/src/sendq.cpp @@ -33,13 +33,14 @@ } else { al.insert(al.begin(), Array())->emplace_back(std::move(tmp)); } + count_in_.Count1(); } catch (std::exception &e) { LOG_ERROR() << "sendq error: " << e.what(); throw e; } } -int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr) +int SendQ::DoSend1Remote(const Remote remote, Array &arr) { auto FirstNotExpired = [](Array &l) { auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; @@ -53,8 +54,8 @@ info.on_expire_(info.data_); } } - - while (pos != arr.end() && mq.TrySend(pos->data().mq_, pos->data().data_)) { + auto TrySend1 = [this](MsgInfo const &info) { return ShmMsgQueue::TrySend(shm_, info.mq_, info.data_); }; + while (pos != arr.end() && TrySend1(pos->data())) { ++pos; } @@ -63,27 +64,26 @@ return nprocessed; } -int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &al) +int SendQ::DoSend1Remote(const Remote remote, ArrayList &al) { int nsend = 0; auto AllSent = [&](Array &arr) { - nsend += DoSend1Remote(mq, remote, arr); + nsend += DoSend1Remote(remote, arr); return arr.empty(); }; for (auto it = al.begin(); it != al.end() && AllSent(*it); it = al.erase(it)) {} return nsend; } -bool SendQ::TrySend(ShmMsgQueue &mq) +bool SendQ::TrySend() { std::unique_lock<std::mutex> lock(mutex_out_); - // if (TooFast()) { return false; } size_t nsend = 0; if (!out_.empty()) { auto rec = out_.begin(); do { - nsend += DoSend1Remote(mq, rec->first, rec->second); + nsend += DoSend1Remote(rec->first, rec->second); if (rec->second.empty()) { rec = out_.erase(rec); } else { @@ -91,6 +91,7 @@ } } while (rec != out_.end()); } + count_out_.Count(nsend); auto Collect = [&]() { std::unique_lock<std::mutex> lock(mutex_in_); @@ -109,14 +110,3 @@ return !out_.empty(); } - -bool SendQ::TooFast() -{ - auto cur = NowSec(); - if (cur > last_time_) { - last_time_ = cur; - count_ = 0; - } - - return ++count_ > 1000 * 100; -} // not accurate in multi-thread. \ No newline at end of file -- Gitblit v1.8.0