lichao
2021-04-15 cc6224d6e7b3f451a3ee7ecfcc7a071552498388
use list<array> to avoid copy buffer.
2个文件已修改
107 ■■■■ 已修改文件
src/sendq.cpp 85 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.h 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.cpp
@@ -19,44 +19,69 @@
#include "shm_queue.h"
#include <chrono>
bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq)
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr)
{
    if (out_.empty()) {
        std::unique_lock<std::mutex> lock(mutex_);
        out_.swap(in_);
    static size_t total = 0;
    static size_t count = 0;
    static size_t max_len = 0;
    static time_t last = 0;
    ++count;
    total += arr.size();
    if (arr.size() > max_len) {
        max_len = arr.size();
    }
    time_t now;
    time(&now);
    if (now > last && count > 0) {
        last = now;
        printf("avg size : %ld, max size: %ld\n", total / count, max_len);
    }
    auto FirstNotExpired = [](MsgList &l) {
    auto FirstNotExpired = [](Array &l) {
        auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
        return std::lower_bound(l.begin(), l.end(), Now(), Less);
    };
    auto SendOneRemote = [&](const Remote &remote, MsgList &msg_list) {
        auto pos = FirstNotExpired(msg_list);
        for (auto it = msg_list.begin(); it != pos; ++it) {
            auto &info = it->data();
            if (info.on_expire_) {
                info.on_expire_(info.msg_);
            }
            info.msg_.Release(mq.shm());
    auto pos = FirstNotExpired(arr);
    for (auto it = arr.begin(); it != pos; ++it) {
        auto &info = it->data();
        if (info.on_expire_) {
            info.on_expire_(info.msg_);
        }
        info.msg_.Release(mq.shm());
    }
        int n = mq.TrySendAll(*(MQId *) remote.data(), MsgIter(pos), MsgIter(msg_list.end()));
        for (int i = 0; i < n; ++i) {
            auto &msg = pos->data().msg_;
            if (msg.IsCounted()) {
                msg.Release(mq.shm());
            }
            ++pos;
    int n = mq.TrySendAll(*(MQId *) remote.data(), MsgIter(pos), MsgIter(arr.end()));
    for (int i = 0; i < n; ++i) {
        auto &msg = pos->data().msg_;
        if (msg.IsCounted()) {
            msg.Release(mq.shm());
        }
        ++pos;
    }
        msg_list.erase(msg_list.begin(), pos);
    arr.erase(arr.begin(), pos);
    return n;
}
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &al)
{
    int nsend = 0;
    auto AllSent = [&](Array &arr) {
        nsend += DoSend1Remote(mq, remote, arr);
        return arr.empty();
    };
    for (auto it = al.begin(); it != al.end() && AllSent(*it); it = al.erase(it)) {}
    return nsend;
}
bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq)
{
    size_t nsend = 0;
    if (!out_.empty()) {
        auto rec = out_.begin();
        do {
            SendOneRemote(rec->first, rec->second);
            nsend += DoSend1Remote(mq, rec->first, rec->second);
            if (rec->second.empty()) {
                rec = out_.erase(rec);
            } else {
@@ -64,5 +89,21 @@
            }
        } while (rec != out_.end());
    }
    auto Collect = [&]() {
        std::unique_lock<std::mutex> lock(mutex_);
        if (out_.empty()) {
            out_.swap(in_);
        } else if (nsend == 0) { // remote blocked
            for (auto &kv : in_) {
                auto &to = out_[kv.first];
                to.splice(to.end(), kv.second);
            }
            in_.clear();
        }
    };
    Collect();
    return !out_.empty();
}
src/sendq.h
@@ -23,6 +23,7 @@
#include "timed_queue.h"
#include <deque>
#include <functional>
#include <list>
#include <mutex>
#include <string>
#include <unordered_map>
@@ -68,21 +69,32 @@
        msg.AddRef();
        TimedMsg tmp(expire, MsgInfo{msg, onExpire});
        std::unique_lock<std::mutex> lock(mutex_);
        in_[addr].emplace_back(std::move(tmp));
        auto &al = in_[addr];
        if (!al.empty()) {
            al.front().emplace_back(std::move(tmp));
        } else {
            al.insert(al.begin(), Array())->emplace_back(std::move(tmp));
        }
    }
    typedef std::deque<TimedMsg> MsgList;
    typedef std::unordered_map<Remote, MsgList> Store;
    typedef std::deque<TimedMsg> Array;
    typedef std::list<Array> ArrayList;
    typedef std::unordered_map<Remote, ArrayList> Store;
    int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr);
    int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &arr);
    class MsgIter
    {
        MsgList::iterator iter_;
        Array::iterator iter_;
    public:
        MsgIter(MsgList::iterator iter) :
        MsgIter(Array::iterator iter) :
            iter_(iter) {}
        MsgIter &operator++() { return ++iter_, *this; }
        bool operator==(const MsgIter &a) { return iter_ == a.iter_; }
        MsgI &operator*() { return iter_->data().msg_; }
    };
    std::mutex mutex_;
    Store in_;
    Store out_;