/*
|
* =====================================================================================
|
*
|
* Filename: sendq.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月14日 09时22分50秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "sendq.h"
|
#include "shm_queue.h"
|
#include <chrono>
|
|
//TODO change to save head, body, instead of MsgI.
|
// as MsgI which is in shm, but head, body are in current process.
|
// Then if node crashes, shm will not be affected by msgs in sendq.
|
// but pulishing ref-counted msg need some work.
|
|
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr)
|
{
|
auto FirstNotExpired = [](Array &l) {
|
auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
|
return std::lower_bound(l.begin(), l.end(), Now(), Less);
|
};
|
|
auto pos = FirstNotExpired(arr);
|
for (auto it = arr.begin(); it != pos; ++it) {
|
auto &info = it->data();
|
if (info.on_expire_) {
|
info.on_expire_(info.msg_);
|
}
|
info.msg_.Release(mq.shm());
|
}
|
|
int n = mq.TrySendAll(*(MQId *) remote.data(), MsgIter(pos), MsgIter(arr.end()));
|
for (int i = 0; i < n; ++i) {
|
auto &msg = pos->data().msg_;
|
if (msg.IsCounted()) {
|
msg.Release(mq.shm());
|
}
|
++pos;
|
}
|
|
arr.erase(arr.begin(), pos);
|
return n;
|
}
|
|
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &al)
|
{
|
int nsend = 0;
|
auto AllSent = [&](Array &arr) {
|
nsend += DoSend1Remote(mq, remote, arr);
|
return arr.empty();
|
};
|
for (auto it = al.begin(); it != al.end() && AllSent(*it); it = al.erase(it)) {}
|
return nsend;
|
}
|
|
bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq)
|
{
|
std::unique_lock<std::mutex> lock(mutex_out_);
|
size_t nsend = 0;
|
if (!out_.empty()) {
|
auto rec = out_.begin();
|
do {
|
nsend += DoSend1Remote(mq, rec->first, rec->second);
|
if (rec->second.empty()) {
|
rec = out_.erase(rec);
|
} else {
|
++rec;
|
}
|
} while (rec != out_.end());
|
}
|
|
auto Collect = [&]() {
|
std::unique_lock<std::mutex> lock(mutex_in_);
|
if (out_.empty()) {
|
out_.swap(in_);
|
} else if (nsend == 0) { // remote blocked
|
for (auto &kv : in_) {
|
auto &to = out_[kv.first];
|
to.splice(to.end(), kv.second);
|
}
|
in_.clear();
|
}
|
};
|
|
Collect();
|
|
return !out_.empty();
|
}
|