/* * ===================================================================================== * * Filename: sendq.h * * Description: * * Version: 1.0 * Created: 2021年04月14日 09时22分59秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #ifndef SENDQ_IWKMSK7M #define SENDQ_IWKMSK7M #include "defs.h" #include "msg.h" #include "timed_queue.h" #include #include #include #include #include #include class ShmMsgQueue; class SendQ { public: typedef MQId Remote; typedef bhome_msg::MsgI MsgI; typedef std::string Content; typedef int64_t Data; typedef std::function OnMsgEvent; struct MsgInfo { MQInfo mq_; Data data_; OnMsgEvent on_expire_; }; typedef TimedData TimedMsg; typedef TimedMsg::TimePoint TimePoint; typedef TimedMsg::Duration Duration; bool Append(const MQInfo &mq, MsgI msg) { msg.AddRef(); auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); }; try { AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); return true; } catch (...) { msg.Release(); return false; } } bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire) { msg.AddRef(); auto onMsgExpire = [onExpire](const Data &d) { onExpire(d); MsgI(d).Release(); }; try { AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); return true; } catch (...) { msg.Release(); return false; } } bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent()) { try { AppendData(mq, command, DefaultExpire(), onExpire); return true; } catch (...) { return false; } } bool TrySend(ShmMsgQueue &mq); private: static TimePoint Now() { return TimedMsg::Clock::now(); } static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(NodeTimeoutSec()); } void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire); typedef std::deque Array; typedef std::list ArrayList; typedef std::unordered_map Store; int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr); int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr); bool TooFast(); std::mutex mutex_in_; std::mutex mutex_out_; Store in_; Store out_; int64_t count_ = 0; int64_t last_time_ = 0; }; #endif // end of include guard: SENDQ_IWKMSK7M