/*
|
* =====================================================================================
|
*
|
* Filename: sendq.h
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月14日 09时22分59秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#ifndef SENDQ_IWKMSK7M
|
#define SENDQ_IWKMSK7M
|
|
#include "defs.h"
|
#include "msg.h"
|
#include "timed_queue.h"
|
#include <boost/variant2/variant.hpp>
|
#include <deque>
|
#include <functional>
|
#include <list>
|
#include <mutex>
|
#include <string>
|
#include <unordered_map>
|
|
class ShmMsgQueue;
|
|
class SendQ
|
{
|
public:
|
typedef MQId Remote;
|
typedef bhome_msg::MsgI MsgI;
|
typedef std::string Content;
|
typedef boost::variant2::variant<MsgI, Content> Data;
|
typedef std::function<void(const Data &)> OnMsgEvent;
|
struct MsgInfo {
|
Data data_;
|
OnMsgEvent on_expire_;
|
};
|
typedef TimedData<MsgInfo> TimedMsg;
|
typedef TimedMsg::TimePoint TimePoint;
|
typedef TimedMsg::Duration Duration;
|
|
// template <class... Rest>
|
// void Append(const MQId &id, Rest &&...rest)
|
// {
|
// Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
|
// }
|
|
void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent())
|
{
|
msg.AddRef();
|
AppendData(addr, Data(msg), DefaultExpire(), onExpire);
|
}
|
void Append(const Remote addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
|
{
|
AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
|
}
|
bool TrySend(ShmMsgQueue &mq);
|
// bool empty() const { return store_.empty(); }
|
|
private:
|
static TimePoint Now() { return TimedMsg::Clock::now(); }
|
static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
|
void AppendData(const Remote addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
|
{
|
//TODO simple queue, organize later ?
|
|
TimedMsg tmp(expire, MsgInfo{std::move(data), std::move(onExpire)});
|
std::unique_lock<std::mutex> lock(mutex_in_);
|
auto &al = in_[addr];
|
if (!al.empty()) {
|
al.front().emplace_back(std::move(tmp));
|
} else {
|
al.insert(al.begin(), Array())->emplace_back(std::move(tmp));
|
}
|
}
|
typedef std::deque<TimedMsg> Array;
|
typedef std::list<Array> ArrayList;
|
typedef std::unordered_map<Remote, ArrayList> Store;
|
|
int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr);
|
int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
|
|
std::mutex mutex_in_;
|
std::mutex mutex_out_;
|
Store in_;
|
Store out_;
|
};
|
|
#endif // end of include guard: SENDQ_IWKMSK7M
|