/* * ===================================================================================== * * Filename: sendq.cpp * * Description: * * Version: 1.0 * Created: 2021年04月14日 09时22分50秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #include "sendq.h" #include "shm_msg_queue.h" #include using namespace bhome_shm; void SendQ::AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire) { TimedMsg tmp(expire, MsgInfo{mq, data, std::move(onExpire)}); std::unique_lock lock(mutex_in_); try { auto &al = in_[mq.id_]; if (!al.empty()) { al.front().emplace_back(std::move(tmp)); } else { al.insert(al.begin(), Array())->emplace_back(std::move(tmp)); } count_in_.Count1(); } catch (std::exception &e) { LOG_ERROR() << "sendq error: " << e.what(); throw e; } } int SendQ::DoSend1Remote(const Remote remote, Array &arr) { auto FirstNotExpired = [](Array &l) { auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; return std::lower_bound(l.begin(), l.end(), Now(), Less); }; auto pos = FirstNotExpired(arr); for (auto it = arr.begin(); it != pos; ++it) { auto &info = it->data(); if (info.on_expire_) { info.on_expire_(info.data_); } } auto TrySend1 = [this](MsgInfo const &info) { return ShmMsgQueue::TrySend(shm_, info.mq_, info.data_); }; while (pos != arr.end() && TrySend1(pos->data())) { ++pos; } int nprocessed = std::distance(arr.begin(), pos); arr.erase(arr.begin(), pos); return nprocessed; } int SendQ::DoSend1Remote(const Remote remote, ArrayList &al) { int nsend = 0; auto AllSent = [&](Array &arr) { nsend += DoSend1Remote(remote, arr); return arr.empty(); }; for (auto it = al.begin(); it != al.end() && AllSent(*it); it = al.erase(it)) {} return nsend; } bool SendQ::TrySend() { std::unique_lock lock(mutex_out_); size_t nsend = 0; if (!out_.empty()) { auto rec = out_.begin(); do { nsend += DoSend1Remote(rec->first, rec->second); if (rec->second.empty()) { rec = out_.erase(rec); } else { ++rec; } } while (rec != out_.end()); } count_out_.Count(nsend); auto Collect = [&]() { std::unique_lock lock(mutex_in_); if (out_.empty()) { out_.swap(in_); } else if (nsend == 0) { // remote blocked for (auto &kv : in_) { auto &to = out_[kv.first]; to.splice(to.end(), kv.second); } in_.clear(); } }; Collect(); return !out_.empty(); }