liuxiaolong
2021-07-20 58d904a328c0d849769b483e901a0be9426b8209
src/sendq.h
@@ -23,69 +23,123 @@
#include "timed_queue.h"
#include <deque>
#include <functional>
#include <list>
#include <mutex>
#include <string>
#include <unordered_map>
namespace bhome_shm
{
class ShmMsgQueue;
} // namespace bhome_shm
class SendQ
{
public:
   typedef std::string Remote;
   typedef MQId Remote;
   typedef bhome_msg::MsgI MsgI;
   typedef std::function<void(const MsgI &msg)> OnMsgEvent;
   typedef std::string Content;
   typedef int64_t Data;
   typedef std::function<void(const Data &)> OnMsgEvent;
   struct MsgInfo {
      MsgI msg_;
      MQInfo mq_;
      Data data_;
      OnMsgEvent on_expire_;
      // OnMsgEvent on_send_;
   };
   typedef TimedData<MsgInfo> TimedMsg;
   typedef TimedMsg::TimePoint TimePoint;
   typedef TimedMsg::Duration Duration;
   SendQ(SharedMemory &shm) :
       shm_(shm) {}
   void Append(const MQId &id, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
   bool Append(const MQInfo &mq, MsgI msg)
   {
      Append(std::string((const char *) &id, sizeof(id)), msg, onExpire);
      msg.AddRef();
      auto onMsgExpire = [msg](const Data &d) mutable { msg.Release(); };
      try {
         AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
         return true;
      } catch (...) {
         msg.Release();
         return false;
      }
   }
   void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
   bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire)
   {
      using namespace std::chrono_literals;
      Append(addr, msg, Now() + 60s, onExpire);
      msg.AddRef();
      auto onMsgExpire = [onExpire, msg](const Data &d) mutable {
         onExpire(d);
         msg.Release();
      };
      try {
         AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
         return true;
      } catch (...) {
         msg.Release();
         return false;
      }
   }
   bool TrySend(bhome_shm::ShmMsgQueue &mq);
   // bool empty() const { return store_.empty(); }
   bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent())
   {
      try {
         AppendData(mq, command, DefaultExpire(), onExpire);
         return true;
      } catch (...) {
         return false;
      }
   }
   bool TrySend();
private:
   static TimePoint Now() { return TimedMsg::Clock::now(); }
   void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire)
   {
      //TODO simple queue, organize later ?
   static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(NodeTimeoutSec()); }
   void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire);
      msg.AddRef();
      TimedMsg tmp(expire, MsgInfo{msg, onExpire});
      std::unique_lock<std::mutex> lock(mutex_);
      in_[addr].emplace_back(std::move(tmp));
   }
   typedef std::deque<TimedMsg> MsgList;
   typedef std::unordered_map<Remote, MsgList> Store;
   class MsgIter
   {
      MsgList::iterator iter_;
   typedef std::deque<TimedMsg> Array;
   typedef std::list<Array> ArrayList;
   typedef std::unordered_map<Remote, ArrayList> Store;
   public:
      MsgIter(MsgList::iterator iter) :
          iter_(iter) {}
      MsgIter &operator++() { return ++iter_, *this; }
      bool operator==(const MsgIter &a) { return iter_ == a.iter_; }
      MsgI &operator*() { return iter_->data().msg_; }
   };
   std::mutex mutex_;
   int DoSend1Remote(const Remote remote, Array &arr);
   int DoSend1Remote(const Remote remote, ArrayList &arr);
   bool TooFast();
   SharedMemory &shm_;
   std::mutex mutex_in_;
   std::mutex mutex_out_;
   Store in_;
   Store out_;
   struct Counter {
      std::atomic<int64_t> count_;
      std::atomic<int64_t> count_1sec_;
      std::atomic<int64_t> last_time_;
      Counter() :
          count_(0), count_1sec_(0), last_time_(0) {}
      void Count1()
      {
         CheckTime();
         ++count_1sec_;
         ++count_;
      }
      void Count(int n)
      {
         CheckTime();
         count_1sec_ += n;
         count_ += n;
      }
      void CheckTime()
      {
         auto cur = NowSec();
         if (cur > last_time_) {
            count_1sec_ = 0;
            last_time_ = cur;
         }
      }
      int64_t GetCount() const { return count_.load(); }
      int64_t LastSec() const { return count_1sec_.load(); }
   };
   Counter count_in_;
   Counter count_out_;
};
#endif // end of include guard: SENDQ_IWKMSK7M