/*
|
* =====================================================================================
|
*
|
* Filename: sendq.h
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月14日 09时22分59秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#ifndef SENDQ_IWKMSK7M
|
#define SENDQ_IWKMSK7M
|
|
#include "defs.h"
|
#include "msg.h"
|
#include "timed_queue.h"
|
#include <deque>
|
#include <functional>
|
#include <list>
|
#include <mutex>
|
#include <string>
|
#include <unordered_map>
|
|
class ShmMsgQueue;
|
|
class SendQ
|
{
|
public:
|
typedef MQId Remote;
|
typedef bhome_msg::MsgI MsgI;
|
typedef std::string Content;
|
typedef int64_t Data;
|
typedef std::function<void(const Data &)> OnMsgEvent;
|
struct MsgInfo {
|
MQInfo mq_;
|
Data data_;
|
OnMsgEvent on_expire_;
|
};
|
typedef TimedData<MsgInfo> TimedMsg;
|
typedef TimedMsg::TimePoint TimePoint;
|
typedef TimedMsg::Duration Duration;
|
|
bool Append(const MQInfo &mq, MsgI msg)
|
{
|
msg.AddRef();
|
auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); };
|
try {
|
AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
|
return true;
|
} catch (...) {
|
msg.Release();
|
return false;
|
}
|
}
|
|
bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire)
|
{
|
msg.AddRef();
|
auto onMsgExpire = [onExpire](const Data &d) {
|
onExpire(d);
|
MsgI(d).Release();
|
};
|
try {
|
AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
|
return true;
|
} catch (...) {
|
msg.Release();
|
return false;
|
}
|
}
|
|
bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent())
|
{
|
try {
|
AppendData(mq, command, DefaultExpire(), onExpire);
|
return true;
|
} catch (...) {
|
return false;
|
}
|
}
|
bool TrySend(ShmMsgQueue &mq);
|
|
private:
|
static TimePoint Now() { return TimedMsg::Clock::now(); }
|
static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
|
void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire);
|
|
typedef std::deque<TimedMsg> Array;
|
typedef std::list<Array> ArrayList;
|
typedef std::unordered_map<Remote, ArrayList> Store;
|
|
int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr);
|
int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
|
|
bool TooFast();
|
|
std::mutex mutex_in_;
|
std::mutex mutex_out_;
|
Store in_;
|
Store out_;
|
|
int64_t count_ = 0;
|
int64_t last_time_ = 0;
|
};
|
|
#endif // end of include guard: SENDQ_IWKMSK7M
|