/*
|
* =====================================================================================
|
*
|
* Filename: sendq.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月14日 09时22分50秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "sendq.h"
|
#include "shm_msg_queue.h"
|
#include <chrono>
|
|
using namespace bhome_shm;
|
|
void SendQ::AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire)
|
{
|
TimedMsg tmp(expire, MsgInfo{mq, data, std::move(onExpire)});
|
std::unique_lock<std::mutex> lock(mutex_in_);
|
|
try {
|
auto &al = in_[mq.id_];
|
if (!al.empty()) {
|
al.front().emplace_back(std::move(tmp));
|
} else {
|
al.insert(al.begin(), Array())->emplace_back(std::move(tmp));
|
}
|
count_in_.Count1();
|
} catch (std::exception &e) {
|
LOG_ERROR() << "sendq error: " << e.what();
|
throw e;
|
}
|
}
|
|
int SendQ::DoSend1Remote(const Remote remote, Array &arr)
|
{
|
auto FirstNotExpired = [](Array &l) {
|
auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
|
return std::lower_bound(l.begin(), l.end(), Now(), Less);
|
};
|
|
auto pos = FirstNotExpired(arr);
|
for (auto it = arr.begin(); it != pos; ++it) {
|
auto &info = it->data();
|
if (info.on_expire_) {
|
info.on_expire_(info.data_);
|
}
|
}
|
auto TrySend1 = [this](MsgInfo const &info) { return ShmMsgQueue::TrySend(shm_, info.mq_, info.data_); };
|
while (pos != arr.end() && TrySend1(pos->data())) {
|
++pos;
|
}
|
|
int nprocessed = std::distance(arr.begin(), pos);
|
arr.erase(arr.begin(), pos);
|
return nprocessed;
|
}
|
|
int SendQ::DoSend1Remote(const Remote remote, ArrayList &al)
|
{
|
int nsend = 0;
|
auto AllSent = [&](Array &arr) {
|
nsend += DoSend1Remote(remote, arr);
|
return arr.empty();
|
};
|
for (auto it = al.begin(); it != al.end() && AllSent(*it); it = al.erase(it)) {}
|
return nsend;
|
}
|
|
bool SendQ::TrySend()
|
{
|
std::unique_lock<std::mutex> lock(mutex_out_);
|
|
size_t nsend = 0;
|
if (!out_.empty()) {
|
auto rec = out_.begin();
|
do {
|
nsend += DoSend1Remote(rec->first, rec->second);
|
if (rec->second.empty()) {
|
rec = out_.erase(rec);
|
} else {
|
++rec;
|
}
|
} while (rec != out_.end());
|
}
|
count_out_.Count(nsend);
|
|
auto Collect = [&]() {
|
std::unique_lock<std::mutex> lock(mutex_in_);
|
if (out_.empty()) {
|
out_.swap(in_);
|
} else if (nsend == 0) { // remote blocked
|
for (auto &kv : in_) {
|
auto &to = out_[kv.first];
|
to.splice(to.end(), kv.second);
|
}
|
in_.clear();
|
}
|
};
|
|
Collect();
|
|
return !out_.empty();
|
}
|