From 330f78f3334bcdcdb4cc2ab2dbf66604e0224d71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 16:21:45 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.5.5:10010/r/valib/bhshmq
---
src/sendq.h | 109 ++++++++++++++++++++++++++++++++++++++----------------
1 files changed, 77 insertions(+), 32 deletions(-)
diff --git a/src/sendq.h b/src/sendq.h
index 0699df7..ec63b05 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -21,7 +21,6 @@
#include "defs.h"
#include "msg.h"
#include "timed_queue.h"
-#include <boost/variant2/variant.hpp>
#include <deque>
#include <functional>
#include <list>
@@ -29,10 +28,7 @@
#include <string>
#include <unordered_map>
-namespace bhome_shm
-{
class ShmMsgQueue;
-} // namespace bhome_shm
class SendQ
{
@@ -40,61 +36,110 @@
typedef MQId Remote;
typedef bhome_msg::MsgI MsgI;
typedef std::string Content;
- typedef boost::variant2::variant<MsgI, Content> Data;
+ typedef int64_t Data;
typedef std::function<void(const Data &)> OnMsgEvent;
struct MsgInfo {
+ MQInfo mq_;
Data data_;
OnMsgEvent on_expire_;
};
typedef TimedData<MsgInfo> TimedMsg;
typedef TimedMsg::TimePoint TimePoint;
typedef TimedMsg::Duration Duration;
+ SendQ(SharedMemory &shm) :
+ shm_(shm) {}
- // template <class... Rest>
- // void Append(const MQId &id, Rest &&...rest)
- // {
- // Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
- // }
-
- void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent())
+ bool Append(const MQInfo &mq, MsgI msg)
{
msg.AddRef();
- AppendData(addr, Data(msg), DefaultExpire(), onExpire);
+ auto onMsgExpire = [msg](const Data &d) mutable { msg.Release(); };
+ try {
+ AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
+ return true;
+ } catch (...) {
+ msg.Release();
+ return false;
+ }
}
- void Append(const Remote addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
+
+ bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire)
{
- AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
+ msg.AddRef();
+ auto onMsgExpire = [onExpire, msg](const Data &d) mutable {
+ onExpire(d);
+ msg.Release();
+ };
+ try {
+ AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
+ return true;
+ } catch (...) {
+ msg.Release();
+ return false;
+ }
}
- bool TrySend(bhome_shm::ShmMsgQueue &mq);
- // bool empty() const { return store_.empty(); }
+
+ bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent())
+ {
+ try {
+ AppendData(mq, command, DefaultExpire(), onExpire);
+ return true;
+ } catch (...) {
+ return false;
+ }
+ }
+ bool TrySend();
private:
static TimePoint Now() { return TimedMsg::Clock::now(); }
- static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
- void AppendData(const Remote addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
- {
- //TODO simple queue, organize later ?
+ static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(NodeTimeoutSec()); }
+ void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire);
- TimedMsg tmp(expire, MsgInfo{std::move(data), std::move(onExpire)});
- std::unique_lock<std::mutex> lock(mutex_in_);
- auto &al = in_[addr];
- if (!al.empty()) {
- al.front().emplace_back(std::move(tmp));
- } else {
- al.insert(al.begin(), Array())->emplace_back(std::move(tmp));
- }
- }
typedef std::deque<TimedMsg> Array;
typedef std::list<Array> ArrayList;
typedef std::unordered_map<Remote, ArrayList> Store;
- int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr);
- int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
+ int DoSend1Remote(const Remote remote, Array &arr);
+ int DoSend1Remote(const Remote remote, ArrayList &arr);
+ bool TooFast();
+
+ SharedMemory &shm_;
std::mutex mutex_in_;
std::mutex mutex_out_;
Store in_;
Store out_;
+
+ struct Counter {
+ std::atomic<int64_t> count_;
+ std::atomic<int64_t> count_1sec_;
+ std::atomic<int64_t> last_time_;
+ Counter() :
+ count_(0), count_1sec_(0), last_time_(0) {}
+ void Count1()
+ {
+ CheckTime();
+ ++count_1sec_;
+ ++count_;
+ }
+ void Count(int n)
+ {
+ CheckTime();
+ count_1sec_ += n;
+ count_ += n;
+ }
+ void CheckTime()
+ {
+ auto cur = NowSec();
+ if (cur > last_time_) {
+ count_1sec_ = 0;
+ last_time_ = cur;
+ }
+ }
+ int64_t GetCount() const { return count_.load(); }
+ int64_t LastSec() const { return count_1sec_.load(); }
+ };
+ Counter count_in_;
+ Counter count_out_;
};
#endif // end of include guard: SENDQ_IWKMSK7M
--
Gitblit v1.8.0