From cab831748a2a9cc18b7f18f3b5e14a4374b7ab68 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 17 五月 2021 18:34:26 +0800 Subject: [PATCH] socket send using abs addr, avoid shm find by id. --- src/sendq.h | 48 ++++++++++++++++++++++++++++++------------------ 1 files changed, 30 insertions(+), 18 deletions(-) diff --git a/src/sendq.h b/src/sendq.h index 9e2b5ca..d1ba30a 100644 --- a/src/sendq.h +++ b/src/sendq.h @@ -39,6 +39,7 @@ typedef int64_t Data; typedef std::function<void(const Data &)> OnMsgEvent; struct MsgInfo { + MQInfo mq_; Data data_; OnMsgEvent on_expire_; }; @@ -46,45 +47,51 @@ typedef TimedMsg::TimePoint TimePoint; typedef TimedMsg::Duration Duration; - void Append(const Remote addr, const MsgI msg) + bool Append(const MQInfo &mq, MsgI msg) { msg.AddRef(); auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); }; - AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire); + try { + AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); + return true; + } catch (...) { + msg.Release(); + return false; + } } - void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire) + bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire) { msg.AddRef(); auto onMsgExpire = [onExpire](const Data &d) { onExpire(d); MsgI(d).Release(); }; - AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire); + try { + AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); + return true; + } catch (...) { + msg.Release(); + return false; + } } - void Append(const Remote addr, const Data command, OnMsgEvent onExpire = OnMsgEvent()) + bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent()) { - AppendData(addr, command, DefaultExpire(), onExpire); + try { + AppendData(mq, command, DefaultExpire(), onExpire); + return true; + } catch (...) { + return false; + } } bool TrySend(ShmMsgQueue &mq); private: static TimePoint Now() { return TimedMsg::Clock::now(); } static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); } - void AppendData(const Remote addr, const Data data, const TimePoint &expire, OnMsgEvent onExpire) - { - //TODO simple queue, organize later ? + void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire); - TimedMsg tmp(expire, MsgInfo{data, std::move(onExpire)}); - std::unique_lock<std::mutex> lock(mutex_in_); - auto &al = in_[addr]; - if (!al.empty()) { - al.front().emplace_back(std::move(tmp)); - } else { - al.insert(al.begin(), Array())->emplace_back(std::move(tmp)); - } - } typedef std::deque<TimedMsg> Array; typedef std::list<Array> ArrayList; typedef std::unordered_map<Remote, ArrayList> Store; @@ -92,10 +99,15 @@ int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr); int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr); + bool TooFast(); + std::mutex mutex_in_; std::mutex mutex_out_; Store in_; Store out_; + + int64_t count_ = 0; + int64_t last_time_ = 0; }; #endif // end of include guard: SENDQ_IWKMSK7M -- Gitblit v1.8.0