/* * ===================================================================================== * * Filename: sendq.cpp * * Description: * * Version: 1.0 * Created: 2021年04月14日 09时22分50秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #include "sendq.h" #include "shm_queue.h" #include bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq) { auto FirstNotExpired = [](MsgList &l) { auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; return std::lower_bound(l.begin(), l.end(), Now(), Less); }; auto SendOneRemote = [&](const Remote &remote, MsgList &msg_list) { auto pos = FirstNotExpired(msg_list); for (auto it = msg_list.begin(); it != pos; ++it) { auto &info = it->data(); if (info.on_expire_) { info.on_expire_(info.msg_); } info.msg_.Release(mq.shm()); } //TODO maybe use TrySendAll ? while (pos != msg_list.end() && mq.TrySend(*(MQId *) remote.data(), pos->data().msg_)) { auto &msg = pos->data().msg_; if (msg.IsCounted()) { msg.Release(mq.shm()); } ++pos; } msg_list.erase(msg_list.begin(), pos); }; if (!store_.empty()) { auto rec = store_.begin(); do { SendOneRemote(rec->first, rec->second); if (rec->second.empty()) { rec = store_.erase(rec); } else { ++rec; } } while (rec != store_.end()); } return !store_.empty(); }