/* * ===================================================================================== * * Filename: sendq.h * * Description: * * Version: 1.0 * Created: 2021年04月14日 09时22分59秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #ifndef SENDQ_IWKMSK7M #define SENDQ_IWKMSK7M #include "defs.h" #include "msg.h" #include "timed_queue.h" #include #include #include #include #include #include class ShmMsgQueue; class SendQ { public: typedef MQId Remote; typedef bhome_msg::MsgI MsgI; typedef std::string Content; typedef int64_t Data; typedef std::function OnMsgEvent; struct MsgInfo { MQInfo mq_; Data data_; OnMsgEvent on_expire_; }; typedef TimedData TimedMsg; typedef TimedMsg::TimePoint TimePoint; typedef TimedMsg::Duration Duration; SendQ(SharedMemory &shm) : shm_(shm) {} bool Append(const MQInfo &mq, MsgI msg) { msg.AddRef(); auto onMsgExpire = [msg](const Data &d) mutable { msg.Release(); }; try { AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); return true; } catch (...) { msg.Release(); return false; } } bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire) { msg.AddRef(); auto onMsgExpire = [onExpire, msg](const Data &d) mutable { onExpire(d); msg.Release(); }; try { AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); return true; } catch (...) { msg.Release(); return false; } } bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent()) { try { AppendData(mq, command, DefaultExpire(), onExpire); return true; } catch (...) { return false; } } bool TrySend(); private: static TimePoint Now() { return TimedMsg::Clock::now(); } static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(NodeTimeoutSec()); } void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire); typedef std::deque Array; typedef std::list ArrayList; typedef std::unordered_map Store; int DoSend1Remote(const Remote remote, Array &arr); int DoSend1Remote(const Remote remote, ArrayList &arr); bool TooFast(); SharedMemory &shm_; std::mutex mutex_in_; std::mutex mutex_out_; Store in_; Store out_; struct Counter { std::atomic count_; std::atomic count_1sec_; std::atomic last_time_; Counter() : count_(0), count_1sec_(0), last_time_(0) {} void Count1() { CheckTime(); ++count_1sec_; ++count_; } void Count(int n) { CheckTime(); count_1sec_ += n; count_ += n; } void CheckTime() { auto cur = NowSec(); if (cur > last_time_) { count_1sec_ = 0; last_time_ = cur; } } int64_t GetCount() const { return count_.load(); } int64_t LastSec() const { return count_1sec_.load(); } }; Counter count_in_; Counter count_out_; }; #endif // end of include guard: SENDQ_IWKMSK7M