lichao
2021-05-17 cab831748a2a9cc18b7f18f3b5e14a4374b7ab68
src/sendq.h
@@ -39,6 +39,7 @@
   typedef int64_t Data;
   typedef std::function<void(const Data &)> OnMsgEvent;
   struct MsgInfo {
      MQInfo mq_;
      Data data_;
      OnMsgEvent on_expire_;
   };
@@ -46,45 +47,51 @@
   typedef TimedMsg::TimePoint TimePoint;
   typedef TimedMsg::Duration Duration;
   void Append(const Remote addr, const MsgI msg)
   bool Append(const MQInfo &mq, MsgI msg)
   {
      msg.AddRef();
      auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); };
      AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire);
      try {
         AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
         return true;
      } catch (...) {
         msg.Release();
         return false;
      }
   }
   void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire)
   bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire)
   {
      msg.AddRef();
      auto onMsgExpire = [onExpire](const Data &d) {
         onExpire(d);
         MsgI(d).Release();
      };
      AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire);
      try {
         AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
         return true;
      } catch (...) {
         msg.Release();
         return false;
      }
   }
   void Append(const Remote addr, const Data command, OnMsgEvent onExpire = OnMsgEvent())
   bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent())
   {
      AppendData(addr, command, DefaultExpire(), onExpire);
      try {
         AppendData(mq, command, DefaultExpire(), onExpire);
         return true;
      } catch (...) {
         return false;
      }
   }
   bool TrySend(ShmMsgQueue &mq);
private:
   static TimePoint Now() { return TimedMsg::Clock::now(); }
   static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
   void AppendData(const Remote addr, const Data data, const TimePoint &expire, OnMsgEvent onExpire)
   {
      //TODO simple queue, organize later ?
   void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire);
      TimedMsg tmp(expire, MsgInfo{data, std::move(onExpire)});
      std::unique_lock<std::mutex> lock(mutex_in_);
      auto &al = in_[addr];
      if (!al.empty()) {
         al.front().emplace_back(std::move(tmp));
      } else {
         al.insert(al.begin(), Array())->emplace_back(std::move(tmp));
      }
   }
   typedef std::deque<TimedMsg> Array;
   typedef std::list<Array> ArrayList;
   typedef std::unordered_map<Remote, ArrayList> Store;
@@ -92,10 +99,15 @@
   int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr);
   int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
   bool TooFast();
   std::mutex mutex_in_;
   std::mutex mutex_out_;
   Store in_;
   Store out_;
   int64_t count_ = 0;
   int64_t last_time_ = 0;
};
#endif // end of include guard: SENDQ_IWKMSK7M