/* * ===================================================================================== * * Filename: sendq.cpp * * Description: * * Version: 1.0 * Created: 2021年04月14日 09时22分50秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #include "sendq.h" #include "shm_queue.h" #include //TODO change to save head, body, instead of MsgI. // as MsgI which is in shm, but head, body are in current process. // Then if node crashes, shm will not be affected by msgs in sendq. // but pulishing ref-counted msg need some work. int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr) { auto FirstNotExpired = [](Array &l) { auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; return std::lower_bound(l.begin(), l.end(), Now(), Less); }; auto pos = FirstNotExpired(arr); for (auto it = arr.begin(); it != pos; ++it) { auto &info = it->data(); if (info.on_expire_) { info.on_expire_(info.msg_); } info.msg_.Release(mq.shm()); } int n = mq.TrySendAll(*(MQId *) remote.data(), MsgIter(pos), MsgIter(arr.end())); for (int i = 0; i < n; ++i) { auto &msg = pos->data().msg_; if (msg.IsCounted()) { msg.Release(mq.shm()); } ++pos; } arr.erase(arr.begin(), pos); return n; } int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &al) { int nsend = 0; auto AllSent = [&](Array &arr) { nsend += DoSend1Remote(mq, remote, arr); return arr.empty(); }; for (auto it = al.begin(); it != al.end() && AllSent(*it); it = al.erase(it)) {} return nsend; } bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq) { std::unique_lock lock(mutex_out_); size_t nsend = 0; if (!out_.empty()) { auto rec = out_.begin(); do { nsend += DoSend1Remote(mq, rec->first, rec->second); if (rec->second.empty()) { rec = out_.erase(rec); } else { ++rec; } } while (rec != out_.end()); } auto Collect = [&]() { std::unique_lock lock(mutex_in_); if (out_.empty()) { out_.swap(in_); } else if (nsend == 0) { // remote blocked for (auto &kv : in_) { auto &to = out_[kv.first]; to.splice(to.end(), kv.second); } in_.clear(); } }; Collect(); return !out_.empty(); }