/*
|
* =====================================================================================
|
*
|
* Filename: sendq.h
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月14日 09时22分59秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#ifndef SENDQ_IWKMSK7M
|
#define SENDQ_IWKMSK7M
|
|
#include "defs.h"
|
#include "msg.h"
|
#include "timed_queue.h"
|
#include <deque>
|
#include <functional>
|
#include <list>
|
#include <mutex>
|
#include <string>
|
#include <unordered_map>
|
|
class ShmMsgQueue;
|
|
class SendQ
|
{
|
public:
|
typedef MQId Remote;
|
typedef bhome_msg::MsgI MsgI;
|
typedef std::string Content;
|
typedef int64_t Data;
|
typedef std::function<void(const Data &)> OnMsgEvent;
|
struct MsgInfo {
|
MQInfo mq_;
|
Data data_;
|
OnMsgEvent on_expire_;
|
};
|
typedef TimedData<MsgInfo> TimedMsg;
|
typedef TimedMsg::TimePoint TimePoint;
|
typedef TimedMsg::Duration Duration;
|
SendQ(SharedMemory &shm) :
|
shm_(shm) {}
|
|
bool Append(const MQInfo &mq, MsgI msg)
|
{
|
msg.AddRef();
|
auto onMsgExpire = [msg](const Data &d) mutable { msg.Release(); };
|
try {
|
AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
|
return true;
|
} catch (...) {
|
msg.Release();
|
return false;
|
}
|
}
|
|
bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire)
|
{
|
msg.AddRef();
|
auto onMsgExpire = [onExpire, msg](const Data &d) mutable {
|
onExpire(d);
|
msg.Release();
|
};
|
try {
|
AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
|
return true;
|
} catch (...) {
|
msg.Release();
|
return false;
|
}
|
}
|
|
bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent())
|
{
|
try {
|
AppendData(mq, command, DefaultExpire(), onExpire);
|
return true;
|
} catch (...) {
|
return false;
|
}
|
}
|
bool TrySend();
|
|
private:
|
static TimePoint Now() { return TimedMsg::Clock::now(); }
|
static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(NodeTimeoutSec()); }
|
void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire);
|
|
typedef std::deque<TimedMsg> Array;
|
typedef std::list<Array> ArrayList;
|
typedef std::unordered_map<Remote, ArrayList> Store;
|
|
int DoSend1Remote(const Remote remote, Array &arr);
|
int DoSend1Remote(const Remote remote, ArrayList &arr);
|
|
bool TooFast();
|
|
SharedMemory &shm_;
|
std::mutex mutex_in_;
|
std::mutex mutex_out_;
|
Store in_;
|
Store out_;
|
|
struct Counter {
|
std::atomic<int64_t> count_;
|
std::atomic<int64_t> count_1sec_;
|
std::atomic<int64_t> last_time_;
|
Counter() :
|
count_(0), count_1sec_(0), last_time_(0) {}
|
void Count1()
|
{
|
CheckTime();
|
++count_1sec_;
|
++count_;
|
}
|
void Count(int n)
|
{
|
CheckTime();
|
count_1sec_ += n;
|
count_ += n;
|
}
|
void CheckTime()
|
{
|
auto cur = NowSec();
|
if (cur > last_time_) {
|
count_1sec_ = 0;
|
last_time_ = cur;
|
}
|
}
|
int64_t GetCount() const { return count_.load(); }
|
int64_t LastSec() const { return count_1sec_.load(); }
|
};
|
Counter count_in_;
|
Counter count_out_;
|
};
|
|
#endif // end of include guard: SENDQ_IWKMSK7M
|