/* * ===================================================================================== * * Filename: sendq.cpp * * Description: * * Version: 1.0 * Created: 2021年04月14日 09时22分50秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #include "sendq.h" #include "shm_msg_queue.h" #include using namespace bhome_shm; int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr) { auto FirstNotExpired = [](Array &l) { auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; return std::lower_bound(l.begin(), l.end(), Now(), Less); }; auto pos = FirstNotExpired(arr); for (auto it = arr.begin(); it != pos; ++it) { auto &info = it->data(); if (info.on_expire_) { info.on_expire_(info.data_); } if (info.data_.index() == 0) { boost::variant2::get<0>(info.data_).Release(); } } auto SendData = [&](Data &d) { bool r = false; if (d.index() == 0) { auto &msg = boost::variant2::get<0>(pos->data().data_); r = mq.TrySend(remote, msg); if (r) { msg.Release(); } } else { auto &content = boost::variant2::get<1>(pos->data().data_); MsgI msg; if (msg.Make(content)) { DEFER1(msg.Release();); r = mq.TrySend(remote, msg); } } return r; }; while (pos != arr.end() && SendData(pos->data().data_)) { ++pos; } int nprocessed = std::distance(arr.begin(), pos); arr.erase(arr.begin(), pos); return nprocessed; } int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &al) { int nsend = 0; auto AllSent = [&](Array &arr) { nsend += DoSend1Remote(mq, remote, arr); return arr.empty(); }; for (auto it = al.begin(); it != al.end() && AllSent(*it); it = al.erase(it)) {} return nsend; } bool SendQ::TrySend(ShmMsgQueue &mq) { std::unique_lock lock(mutex_out_); size_t nsend = 0; if (!out_.empty()) { auto rec = out_.begin(); do { nsend += DoSend1Remote(mq, rec->first, rec->second); if (rec->second.empty()) { rec = out_.erase(rec); } else { ++rec; } } while (rec != out_.end()); } auto Collect = [&]() { std::unique_lock lock(mutex_in_); if (out_.empty()) { out_.swap(in_); } else if (nsend == 0) { // remote blocked for (auto &kv : in_) { auto &to = out_[kv.first]; to.splice(to.end(), kv.second); } in_.clear(); } }; Collect(); return !out_.empty(); }