lichao
2021-04-16 708ff9e8af731e2799767ed8bfca7df3b74fc26a
src/sendq.h
@@ -21,6 +21,7 @@
#include "defs.h"
#include "msg.h"
#include "timed_queue.h"
#include <boost/variant2/variant.hpp>
#include <deque>
#include <functional>
#include <list>
@@ -38,36 +39,43 @@
public:
   typedef std::string Remote;
   typedef bhome_msg::MsgI MsgI;
   typedef std::function<void(const MsgI &msg)> OnMsgEvent;
   typedef std::string Content;
   typedef boost::variant2::variant<MsgI, Content> Data;
   typedef std::function<void(const Data &)> OnMsgEvent;
   struct MsgInfo {
      MsgI msg_;
      Data data_;
      OnMsgEvent on_expire_;
      // OnMsgEvent on_send_;
   };
   typedef TimedData<MsgInfo> TimedMsg;
   typedef TimedMsg::TimePoint TimePoint;
   typedef TimedMsg::Duration Duration;
   void Append(const MQId &id, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
   template <class... Rest>
   void Append(const MQId &id, Rest &&...rest)
   {
      Append(std::string((const char *) &id, sizeof(id)), msg, onExpire);
      Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
   }
   void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
   {
      using namespace std::chrono_literals;
      Append(addr, msg, Now() + 60s, onExpire);
      msg.AddRef();
      AppendData(addr, Data(msg), DefaultExpire(), onExpire);
   }
   void Append(const Remote &addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
   {
      AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
   }
   bool TrySend(bhome_shm::ShmMsgQueue &mq);
   // bool empty() const { return store_.empty(); }
private:
   static TimePoint Now() { return TimedMsg::Clock::now(); }
   void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire)
   static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
   void AppendData(const Remote &addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
   {
      //TODO simple queue, organize later ?
      msg.AddRef();
      TimedMsg tmp(expire, MsgInfo{msg, onExpire});
      TimedMsg tmp(expire, MsgInfo{std::move(data), std::move(onExpire)});
      std::unique_lock<std::mutex> lock(mutex_in_);
      auto &al = in_[addr];
      if (!al.empty()) {
@@ -82,18 +90,6 @@
   int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr);
   int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &arr);
   class MsgIter
   {
      Array::iterator iter_;
   public:
      MsgIter(Array::iterator iter) :
          iter_(iter) {}
      MsgIter &operator++() { return ++iter_, *this; }
      bool operator==(const MsgIter &a) { return iter_ == a.iter_; }
      MsgI &operator*() { return iter_->data().msg_; }
   };
   std::mutex mutex_in_;
   std::mutex mutex_out_;