/*
|
* =====================================================================================
|
*
|
* Filename: sendq.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月14日 09时22分50秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "sendq.h"
|
#include "shm_queue.h"
|
#include <chrono>
|
|
bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq)
|
{
|
auto FirstNotExpired = [](MsgList &l) {
|
auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
|
return std::lower_bound(l.begin(), l.end(), Now(), Less);
|
};
|
|
auto SendOneRemote = [&](const Remote &remote, MsgList &msg_list) {
|
auto pos = FirstNotExpired(msg_list);
|
for (auto it = msg_list.begin(); it != pos; ++it) {
|
auto &info = it->data();
|
if (info.on_expire_) {
|
info.on_expire_(info.msg_);
|
}
|
info.msg_.Release(mq.shm());
|
}
|
|
//TODO maybe use TrySendAll ?
|
while (pos != msg_list.end() && mq.TrySend(*(MQId *) remote.data(), pos->data().msg_)) {
|
auto &msg = pos->data().msg_;
|
if (msg.IsCounted()) {
|
msg.Release(mq.shm());
|
}
|
++pos;
|
}
|
msg_list.erase(msg_list.begin(), pos);
|
};
|
|
if (!store_.empty()) {
|
auto rec = store_.begin();
|
do {
|
SendOneRemote(rec->first, rec->second);
|
if (rec->second.empty()) {
|
rec = store_.erase(rec);
|
} else {
|
++rec;
|
}
|
} while (rec != store_.end());
|
}
|
return !store_.empty();
|
}
|