From 58d904a328c0d849769b483e901a0be9426b8209 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期二, 20 七月 2021 20:20:44 +0800 Subject: [PATCH] 调整Request C.BHFree的位置 --- src/sendq.h | 121 +++++++++++++++++++++++++++------------- 1 files changed, 81 insertions(+), 40 deletions(-) diff --git a/src/sendq.h b/src/sendq.h index aa8923d..ec63b05 100644 --- a/src/sendq.h +++ b/src/sendq.h @@ -28,77 +28,118 @@ #include <string> #include <unordered_map> -namespace bhome_shm -{ class ShmMsgQueue; -} // namespace bhome_shm class SendQ { public: - typedef std::string Remote; + typedef MQId Remote; typedef bhome_msg::MsgI MsgI; - typedef std::function<void(const MsgI &msg)> OnMsgEvent; + typedef std::string Content; + typedef int64_t Data; + typedef std::function<void(const Data &)> OnMsgEvent; struct MsgInfo { - MsgI msg_; + MQInfo mq_; + Data data_; OnMsgEvent on_expire_; - // OnMsgEvent on_send_; }; typedef TimedData<MsgInfo> TimedMsg; typedef TimedMsg::TimePoint TimePoint; typedef TimedMsg::Duration Duration; + SendQ(SharedMemory &shm) : + shm_(shm) {} - void Append(const MQId &id, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) + bool Append(const MQInfo &mq, MsgI msg) { - Append(std::string((const char *) &id, sizeof(id)), msg, onExpire); + msg.AddRef(); + auto onMsgExpire = [msg](const Data &d) mutable { msg.Release(); }; + try { + AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); + return true; + } catch (...) { + msg.Release(); + return false; + } } - void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) + + bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire) { - using namespace std::chrono_literals; - Append(addr, msg, Now() + 3s, onExpire); + msg.AddRef(); + auto onMsgExpire = [onExpire, msg](const Data &d) mutable { + onExpire(d); + msg.Release(); + }; + try { + AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); + return true; + } catch (...) { + msg.Release(); + return false; + } } - bool TrySend(bhome_shm::ShmMsgQueue &mq); - // bool empty() const { return store_.empty(); } + + bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent()) + { + try { + AppendData(mq, command, DefaultExpire(), onExpire); + return true; + } catch (...) { + return false; + } + } + bool TrySend(); private: static TimePoint Now() { return TimedMsg::Clock::now(); } - void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire) - { - //TODO simple queue, organize later ? + static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(NodeTimeoutSec()); } + void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire); - msg.AddRef(); - TimedMsg tmp(expire, MsgInfo{msg, onExpire}); - std::unique_lock<std::mutex> lock(mutex_in_); - auto &al = in_[addr]; - if (!al.empty()) { - al.front().emplace_back(std::move(tmp)); - } else { - al.insert(al.begin(), Array())->emplace_back(std::move(tmp)); - } - } typedef std::deque<TimedMsg> Array; typedef std::list<Array> ArrayList; typedef std::unordered_map<Remote, ArrayList> Store; - int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr); - int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &arr); + int DoSend1Remote(const Remote remote, Array &arr); + int DoSend1Remote(const Remote remote, ArrayList &arr); - class MsgIter - { - Array::iterator iter_; + bool TooFast(); - public: - MsgIter(Array::iterator iter) : - iter_(iter) {} - MsgIter &operator++() { return ++iter_, *this; } - bool operator==(const MsgIter &a) { return iter_ == a.iter_; } - MsgI &operator*() { return iter_->data().msg_; } - }; - + SharedMemory &shm_; std::mutex mutex_in_; std::mutex mutex_out_; Store in_; Store out_; + + struct Counter { + std::atomic<int64_t> count_; + std::atomic<int64_t> count_1sec_; + std::atomic<int64_t> last_time_; + Counter() : + count_(0), count_1sec_(0), last_time_(0) {} + void Count1() + { + CheckTime(); + ++count_1sec_; + ++count_; + } + void Count(int n) + { + CheckTime(); + count_1sec_ += n; + count_ += n; + } + void CheckTime() + { + auto cur = NowSec(); + if (cur > last_time_) { + count_1sec_ = 0; + last_time_ = cur; + } + } + int64_t GetCount() const { return count_.load(); } + int64_t LastSec() const { return count_1sec_.load(); } + }; + Counter count_in_; + Counter count_out_; }; #endif // end of include guard: SENDQ_IWKMSK7M -- Gitblit v1.8.0