lichao
2021-05-14 34bc326eab06b9b1da2004a9e0d2182d63501d68
change SendQ data from variant to int64.
3个文件已修改
41 ■■■■ 已修改文件
src/sendq.cpp 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.h 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.cpp
@@ -34,17 +34,9 @@
        if (info.on_expire_) {
            info.on_expire_(info.data_);
        }
        if (info.data_.index() == 0) {
            boost::variant2::get<0>(info.data_).Release();
        }
    }
    auto SendData = [&](Data &d) {
        auto TrySend = [&](auto &&data) { return mq.TrySend(remote, data); };
        return boost::variant2::visit(TrySend, pos->data().data_);
    };
    while (pos != arr.end() && SendData(pos->data().data_)) {
    while (pos != arr.end() && mq.TrySend(remote, pos->data().data_)) {
        ++pos;
    }
src/sendq.h
@@ -21,7 +21,6 @@
#include "defs.h"
#include "msg.h"
#include "timed_queue.h"
#include <boost/variant2/variant.hpp>
#include <deque>
#include <functional>
#include <list>
@@ -37,8 +36,7 @@
    typedef MQId Remote;
    typedef bhome_msg::MsgI MsgI;
    typedef std::string Content;
    typedef int64_t Command;
    typedef boost::variant2::variant<MsgI, Command> Data;
    typedef int64_t Data;
    typedef std::function<void(const Data &)> OnMsgEvent;
    struct MsgInfo {
        Data data_;
@@ -48,25 +46,37 @@
    typedef TimedMsg::TimePoint TimePoint;
    typedef TimedMsg::Duration Duration;
    void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent())
    void Append(const Remote addr, const MsgI msg)
    {
        msg.AddRef();
        AppendData(addr, Data(msg), DefaultExpire(), onExpire);
        auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); };
        AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire);
    }
    void Append(const Remote addr, const Command command, OnMsgEvent onExpire = OnMsgEvent())
    void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire)
    {
        AppendData(addr, Data(command), DefaultExpire(), onExpire);
        msg.AddRef();
        auto onMsgExpire = [onExpire](const Data &d) {
            onExpire(d);
            MsgI(d).Release();
        };
        AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire);
    }
    void Append(const Remote addr, const Data command, OnMsgEvent onExpire = OnMsgEvent())
    {
        AppendData(addr, command, DefaultExpire(), onExpire);
    }
    bool TrySend(ShmMsgQueue &mq);
private:
    static TimePoint Now() { return TimedMsg::Clock::now(); }
    static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
    void AppendData(const Remote addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
    void AppendData(const Remote addr, const Data data, const TimePoint &expire, OnMsgEvent onExpire)
    {
        //TODO simple queue, organize later ?
        TimedMsg tmp(expire, MsgInfo{std::move(data), std::move(onExpire)});
        TimedMsg tmp(expire, MsgInfo{data, std::move(onExpire)});
        std::unique_lock<std::mutex> lock(mutex_in_);
        auto &al = in_[addr];
        if (!al.empty()) {
utest/speed_test.cpp
@@ -57,7 +57,8 @@
        DEFER1(msg.Release(););
        for (uint64_t i = 0; i < n; ++i) {
            while (!mq.TrySend(id, msg)) {}
            msg.AddRef();
            while (!mq.TrySend(id, msg.Offset())) {}
            ++nwrite;
        }
    };