lichao
2021-05-14 34bc326eab06b9b1da2004a9e0d2182d63501d68
src/sendq.h
@@ -21,7 +21,6 @@
#include "defs.h"
#include "msg.h"
#include "timed_queue.h"
#include <boost/variant2/variant.hpp>
#include <deque>
#include <functional>
#include <list>
@@ -37,8 +36,7 @@
   typedef MQId Remote;
   typedef bhome_msg::MsgI MsgI;
   typedef std::string Content;
   typedef int64_t Command;
   typedef boost::variant2::variant<MsgI, Command> Data;
   typedef int64_t Data;
   typedef std::function<void(const Data &)> OnMsgEvent;
   struct MsgInfo {
      Data data_;
@@ -48,25 +46,37 @@
   typedef TimedMsg::TimePoint TimePoint;
   typedef TimedMsg::Duration Duration;
   void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent())
   void Append(const Remote addr, const MsgI msg)
   {
      msg.AddRef();
      AppendData(addr, Data(msg), DefaultExpire(), onExpire);
      auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); };
      AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire);
   }
   void Append(const Remote addr, const Command command, OnMsgEvent onExpire = OnMsgEvent())
   void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire)
   {
      AppendData(addr, Data(command), DefaultExpire(), onExpire);
      msg.AddRef();
      auto onMsgExpire = [onExpire](const Data &d) {
         onExpire(d);
         MsgI(d).Release();
      };
      AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire);
   }
   void Append(const Remote addr, const Data command, OnMsgEvent onExpire = OnMsgEvent())
   {
      AppendData(addr, command, DefaultExpire(), onExpire);
   }
   bool TrySend(ShmMsgQueue &mq);
private:
   static TimePoint Now() { return TimedMsg::Clock::now(); }
   static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
   void AppendData(const Remote addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
   void AppendData(const Remote addr, const Data data, const TimePoint &expire, OnMsgEvent onExpire)
   {
      //TODO simple queue, organize later ?
      TimedMsg tmp(expire, MsgInfo{std::move(data), std::move(onExpire)});
      TimedMsg tmp(expire, MsgInfo{data, std::move(onExpire)});
      std::unique_lock<std::mutex> lock(mutex_in_);
      auto &al = in_[addr];
      if (!al.empty()) {