| | |
| | | #include "defs.h" |
| | | #include "msg.h" |
| | | #include "timed_queue.h" |
| | | #include <boost/variant2/variant.hpp> |
| | | #include <deque> |
| | | #include <functional> |
| | | #include <list> |
| | |
| | | public: |
| | | typedef std::string Remote; |
| | | typedef bhome_msg::MsgI MsgI; |
| | | typedef std::function<void(const MsgI &msg)> OnMsgEvent; |
| | | typedef std::string Content; |
| | | typedef boost::variant2::variant<MsgI, Content> Data; |
| | | typedef std::function<void(const Data &)> OnMsgEvent; |
| | | struct MsgInfo { |
| | | MsgI msg_; |
| | | Data data_; |
| | | OnMsgEvent on_expire_; |
| | | // OnMsgEvent on_send_; |
| | | }; |
| | | typedef TimedData<MsgInfo> TimedMsg; |
| | | typedef TimedMsg::TimePoint TimePoint; |
| | | typedef TimedMsg::Duration Duration; |
| | | |
| | | void Append(const MQId &id, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) |
| | | template <class... Rest> |
| | | void Append(const MQId &id, Rest &&...rest) |
| | | { |
| | | Append(std::string((const char *) &id, sizeof(id)), msg, onExpire); |
| | | Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...); |
| | | } |
| | | |
| | | void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | using namespace std::chrono_literals; |
| | | Append(addr, msg, Now() + 60s, onExpire); |
| | | msg.AddRef(); |
| | | AppendData(addr, Data(msg), DefaultExpire(), onExpire); |
| | | } |
| | | void Append(const Remote &addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire); |
| | | } |
| | | bool TrySend(bhome_shm::ShmMsgQueue &mq); |
| | | // bool empty() const { return store_.empty(); } |
| | | |
| | | private: |
| | | static TimePoint Now() { return TimedMsg::Clock::now(); } |
| | | void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire) |
| | | static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); } |
| | | void AppendData(const Remote &addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire) |
| | | { |
| | | //TODO simple queue, organize later ? |
| | | |
| | | msg.AddRef(); |
| | | TimedMsg tmp(expire, MsgInfo{msg, onExpire}); |
| | | TimedMsg tmp(expire, MsgInfo{std::move(data), std::move(onExpire)}); |
| | | std::unique_lock<std::mutex> lock(mutex_in_); |
| | | auto &al = in_[addr]; |
| | | if (!al.empty()) { |
| | |
| | | |
| | | int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr); |
| | | int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &arr); |
| | | |
| | | class MsgIter |
| | | { |
| | | Array::iterator iter_; |
| | | |
| | | public: |
| | | MsgIter(Array::iterator iter) : |
| | | iter_(iter) {} |
| | | MsgIter &operator++() { return ++iter_, *this; } |
| | | bool operator==(const MsgIter &a) { return iter_ == a.iter_; } |
| | | MsgI &operator*() { return iter_->data().msg_; } |
| | | }; |
| | | |
| | | std::mutex mutex_in_; |
| | | std::mutex mutex_out_; |