From 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 19 五月 2021 19:14:13 +0800 Subject: [PATCH] add api BHQueryProcs. --- src/sendq.h | 83 ++++++++++++++++++++++++++++++----------- 1 files changed, 61 insertions(+), 22 deletions(-) diff --git a/src/sendq.h b/src/sendq.h index c6f270b..759e12a 100644 --- a/src/sendq.h +++ b/src/sendq.h @@ -23,52 +23,91 @@ #include "timed_queue.h" #include <deque> #include <functional> +#include <list> +#include <mutex> #include <string> #include <unordered_map> -namespace bhome_shm -{ class ShmMsgQueue; -} // namespace bhome_shm class SendQ { public: - typedef std::string Remote; + typedef MQId Remote; typedef bhome_msg::MsgI MsgI; - typedef std::function<void(const MsgI &msg)> OnMsgEvent; + typedef std::string Content; + typedef int64_t Data; + typedef std::function<void(const Data &)> OnMsgEvent; struct MsgInfo { - MsgI msg_; + MQInfo mq_; + Data data_; OnMsgEvent on_expire_; - // OnMsgEvent on_send_; }; typedef TimedData<MsgInfo> TimedMsg; typedef TimedMsg::TimePoint TimePoint; typedef TimedMsg::Duration Duration; - void Append(const MQId &id, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) + bool Append(const MQInfo &mq, MsgI msg) { - Append(std::string((const char *) &id, sizeof(id)), msg, onExpire); + msg.AddRef(); + auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); }; + try { + AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); + return true; + } catch (...) { + msg.Release(); + return false; + } } - void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) + + bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire) { - using namespace std::chrono_literals; - Append(addr, msg, Now() + 60s, onExpire); + msg.AddRef(); + auto onMsgExpire = [onExpire](const Data &d) { + onExpire(d); + MsgI(d).Release(); + }; + try { + AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); + return true; + } catch (...) { + msg.Release(); + return false; + } } - bool TrySend(bhome_shm::ShmMsgQueue &mq); - // bool empty() const { return store_.empty(); } + + bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent()) + { + try { + AppendData(mq, command, DefaultExpire(), onExpire); + return true; + } catch (...) { + return false; + } + } + bool TrySend(ShmMsgQueue &mq); private: static TimePoint Now() { return TimedMsg::Clock::now(); } - void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire) - { - msg.AddRef(); - store_[addr].emplace_back(TimedMsg(expire, MsgInfo{msg, onExpire})); - } - typedef std::deque<TimedMsg> MsgList; - typedef std::unordered_map<Remote, MsgList> Store; + static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(NodeTimeoutSec()); } + void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire); - Store store_; + typedef std::deque<TimedMsg> Array; + typedef std::list<Array> ArrayList; + typedef std::unordered_map<Remote, ArrayList> Store; + + int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr); + int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr); + + bool TooFast(); + + std::mutex mutex_in_; + std::mutex mutex_out_; + Store in_; + Store out_; + + int64_t count_ = 0; + int64_t last_time_ = 0; }; #endif // end of include guard: SENDQ_IWKMSK7M -- Gitblit v1.8.0