| | |
| | | typedef int64_t Data; |
| | | typedef std::function<void(const Data &)> OnMsgEvent; |
| | | struct MsgInfo { |
| | | MQInfo mq_; |
| | | Data data_; |
| | | OnMsgEvent on_expire_; |
| | | }; |
| | |
| | | typedef TimedMsg::TimePoint TimePoint; |
| | | typedef TimedMsg::Duration Duration; |
| | | |
| | | void Append(const Remote addr, const MsgI msg) |
| | | bool Append(const MQInfo &mq, MsgI msg) |
| | | { |
| | | msg.AddRef(); |
| | | auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); }; |
| | | AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire); |
| | | try { |
| | | AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); |
| | | return true; |
| | | } catch (...) { |
| | | msg.Release(); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire) |
| | | bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire) |
| | | { |
| | | msg.AddRef(); |
| | | auto onMsgExpire = [onExpire](const Data &d) { |
| | | onExpire(d); |
| | | MsgI(d).Release(); |
| | | }; |
| | | AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire); |
| | | try { |
| | | AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); |
| | | return true; |
| | | } catch (...) { |
| | | msg.Release(); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | void Append(const Remote addr, const Data command, OnMsgEvent onExpire = OnMsgEvent()) |
| | | bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | AppendData(addr, command, DefaultExpire(), onExpire); |
| | | try { |
| | | AppendData(mq, command, DefaultExpire(), onExpire); |
| | | return true; |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | bool TrySend(ShmMsgQueue &mq); |
| | | |
| | | private: |
| | | static TimePoint Now() { return TimedMsg::Clock::now(); } |
| | | static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); } |
| | | void AppendData(const Remote addr, const Data data, const TimePoint &expire, OnMsgEvent onExpire) |
| | | { |
| | | //TODO simple queue, organize later ? |
| | | void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire); |
| | | |
| | | TimedMsg tmp(expire, MsgInfo{data, std::move(onExpire)}); |
| | | std::unique_lock<std::mutex> lock(mutex_in_); |
| | | auto &al = in_[addr]; |
| | | if (!al.empty()) { |
| | | al.front().emplace_back(std::move(tmp)); |
| | | } else { |
| | | al.insert(al.begin(), Array())->emplace_back(std::move(tmp)); |
| | | } |
| | | } |
| | | typedef std::deque<TimedMsg> Array; |
| | | typedef std::list<Array> ArrayList; |
| | | typedef std::unordered_map<Remote, ArrayList> Store; |
| | |
| | | int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr); |
| | | int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr); |
| | | |
| | | bool TooFast(); |
| | | |
| | | std::mutex mutex_in_; |
| | | std::mutex mutex_out_; |
| | | Store in_; |
| | | Store out_; |
| | | |
| | | int64_t count_ = 0; |
| | | int64_t last_time_ = 0; |
| | | }; |
| | | |
| | | #endif // end of include guard: SENDQ_IWKMSK7M |