/* * ===================================================================================== * * Filename: sendq.cpp * * Description: * * Version: 1.0 * Created: 2021年04月14日 09时22分50秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #include "sendq.h" #include "shm_msg_queue.h" #include using namespace bhome_shm; void SendQ::AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire) { TimedMsg tmp(expire, MsgInfo{mq, data, std::move(onExpire)}); std::unique_lock lock(mutex_in_); try { auto &al = in_[mq.id_]; if (!al.empty()) { al.front().emplace_back(std::move(tmp)); } else { al.insert(al.begin(), Array())->emplace_back(std::move(tmp)); } } catch (std::exception &e) { LOG_ERROR() << "sendq error: " << e.what(); throw e; } } int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr) { auto FirstNotExpired = [](Array &l) { auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; return std::lower_bound(l.begin(), l.end(), Now(), Less); }; auto pos = FirstNotExpired(arr); for (auto it = arr.begin(); it != pos; ++it) { auto &info = it->data(); if (info.on_expire_) { info.on_expire_(info.data_); } } while (pos != arr.end() && mq.TrySend(pos->data().mq_, pos->data().data_)) { ++pos; } int nprocessed = std::distance(arr.begin(), pos); arr.erase(arr.begin(), pos); return nprocessed; } int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &al) { int nsend = 0; auto AllSent = [&](Array &arr) { nsend += DoSend1Remote(mq, remote, arr); return arr.empty(); }; for (auto it = al.begin(); it != al.end() && AllSent(*it); it = al.erase(it)) {} return nsend; } bool SendQ::TrySend(ShmMsgQueue &mq) { std::unique_lock lock(mutex_out_); // if (TooFast()) { return false; } size_t nsend = 0; if (!out_.empty()) { auto rec = out_.begin(); do { nsend += DoSend1Remote(mq, rec->first, rec->second); if (rec->second.empty()) { rec = out_.erase(rec); } else { ++rec; } } while (rec != out_.end()); } auto Collect = [&]() { std::unique_lock lock(mutex_in_); if (out_.empty()) { out_.swap(in_); } else if (nsend == 0) { // remote blocked for (auto &kv : in_) { auto &to = out_[kv.first]; to.splice(to.end(), kv.second); } in_.clear(); } }; Collect(); return !out_.empty(); } bool SendQ::TooFast() { auto cur = NowSec(); if (cur > last_time_) { last_time_ = cur; count_ = 0; } return ++count_ > 1000 * 100; } // not accurate in multi-thread.