/*
|
* =====================================================================================
|
*
|
* Filename: sendq.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月14日 09时22分50秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "sendq.h"
|
#include "shm_queue.h"
|
#include <chrono>
|
|
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr)
|
{
|
auto FirstNotExpired = [](Array &l) {
|
auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
|
return std::lower_bound(l.begin(), l.end(), Now(), Less);
|
};
|
|
auto pos = FirstNotExpired(arr);
|
for (auto it = arr.begin(); it != pos; ++it) {
|
auto &info = it->data();
|
if (info.on_expire_) {
|
info.on_expire_(info.data_);
|
}
|
if (info.data_.index() == 0) {
|
boost::variant2::get<0>(info.data_).Release(mq.shm());
|
}
|
}
|
|
auto SendData = [&](Data &d) {
|
bool r = false;
|
if (d.index() == 0) {
|
auto &msg = boost::variant2::get<0>(pos->data().data_);
|
r = mq.TrySend(*(MQId *) remote.data(), msg);
|
if (r && msg.IsCounted()) {
|
msg.Release(mq.shm());
|
}
|
} else {
|
auto &content = boost::variant2::get<1>(pos->data().data_);
|
MsgI msg;
|
if (msg.Make(mq.shm(), content)) {
|
r = mq.TrySend(*(MQId *) remote.data(), msg);
|
if (!r || msg.IsCounted()) {
|
msg.Release(mq.shm());
|
}
|
}
|
}
|
return r;
|
};
|
|
while (pos != arr.end() && SendData(pos->data().data_)) {
|
++pos;
|
}
|
|
int nprocessed = std::distance(arr.begin(), pos);
|
arr.erase(arr.begin(), pos);
|
return nprocessed;
|
}
|
|
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &al)
|
{
|
int nsend = 0;
|
auto AllSent = [&](Array &arr) {
|
nsend += DoSend1Remote(mq, remote, arr);
|
return arr.empty();
|
};
|
for (auto it = al.begin(); it != al.end() && AllSent(*it); it = al.erase(it)) {}
|
return nsend;
|
}
|
|
bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq)
|
{
|
std::unique_lock<std::mutex> lock(mutex_out_);
|
size_t nsend = 0;
|
if (!out_.empty()) {
|
auto rec = out_.begin();
|
do {
|
nsend += DoSend1Remote(mq, rec->first, rec->second);
|
if (rec->second.empty()) {
|
rec = out_.erase(rec);
|
} else {
|
++rec;
|
}
|
} while (rec != out_.end());
|
}
|
|
auto Collect = [&]() {
|
std::unique_lock<std::mutex> lock(mutex_in_);
|
if (out_.empty()) {
|
out_.swap(in_);
|
} else if (nsend == 0) { // remote blocked
|
for (auto &kv : in_) {
|
auto &to = out_[kv.first];
|
to.splice(to.end(), kv.second);
|
}
|
in_.clear();
|
}
|
};
|
|
Collect();
|
|
return !out_.empty();
|
}
|