lichao
2021-04-16 708ff9e8af731e2799767ed8bfca7df3b74fc26a
sendq use less shm, copy data.
5个文件已修改
157 ■■■■■ 已修改文件
src/msg.h 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.cpp 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.h 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h
@@ -82,6 +82,15 @@
            uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); });
    }
    void *Pack(SharedMemory &shm, const std::string &content)
    {
        void *addr = shm.Alloc(content.size());
        if (addr) {
            memcpy(addr, content.data(), content.size());
        }
        return addr;
    }
    bool MakeRC(SharedMemory &shm, void *addr);
    bool Make(SharedMemory &shm, void *addr);
@@ -111,7 +120,6 @@
    }
    bool EnableRefCount(SharedMemory &shm);
    template <class Body>
    inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
    {
@@ -119,6 +127,29 @@
        auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; };
        return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p);
    }
    template <class Body>
    static inline std::string Serialize(const BHMsgHead &head, const Body &body)
    {
        uint32_t head_len = head.ByteSizeLong();
        uint32_t body_len = body.ByteSizeLong();
        std::string s(4 + head_len + 4 + body_len, '\0');
        size_t pos = 0;
        auto add1 = [&](auto &&msg, auto &&size) {
            Put32(&s[pos], size);
            pos += 4;
            msg.SerializeToArray(&s[pos], size);
            pos += size;
        };
        add1(head, head_len);
        add1(body, body_len);
        assert(pos == s.size());
        return s;
    }
    inline bool Make(SharedMemory &shm, const std::string &content)
    {
        void *p = Pack(shm, content);
        return Make(shm, p);
    }
    bool ParseHead(BHMsgHead &head) const;
    template <class Body>
src/sendq.cpp
@@ -19,11 +19,6 @@
#include "shm_queue.h"
#include <chrono>
//TODO change to save head, body, instead of MsgI.
// as MsgI which is in shm, but head, body are in current process.
// Then if node crashes, shm will not be affected by msgs in sendq.
// but pulishing ref-counted msg need some work.
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr)
{
    auto FirstNotExpired = [](Array &l) {
@@ -35,22 +30,41 @@
    for (auto it = arr.begin(); it != pos; ++it) {
        auto &info = it->data();
        if (info.on_expire_) {
            info.on_expire_(info.msg_);
            info.on_expire_(info.data_);
        }
        info.msg_.Release(mq.shm());
        if (info.data_.index() == 0) {
            boost::variant2::get<0>(info.data_).Release(mq.shm());
        }
    }
    int n = mq.TrySendAll(*(MQId *) remote.data(), MsgIter(pos), MsgIter(arr.end()));
    for (int i = 0; i < n; ++i) {
        auto &msg = pos->data().msg_;
        if (msg.IsCounted()) {
            msg.Release(mq.shm());
    auto SendData = [&](Data &d) {
        bool r = false;
        if (d.index() == 0) {
            auto &msg = boost::variant2::get<0>(pos->data().data_);
            r = mq.TrySend(*(MQId *) remote.data(), msg);
            if (r && msg.IsCounted()) {
                msg.Release(mq.shm());
            }
        } else {
            auto &content = boost::variant2::get<1>(pos->data().data_);
            MsgI msg;
            if (msg.Make(mq.shm(), content)) {
                r = mq.TrySend(*(MQId *) remote.data(), msg);
                if (!r || msg.IsCounted()) {
                    msg.Release(mq.shm());
                }
            }
        }
        return r;
    };
    while (pos != arr.end() && SendData(pos->data().data_)) {
        ++pos;
    }
    int nprocessed = std::distance(arr.begin(), pos);
    arr.erase(arr.begin(), pos);
    return n;
    return nprocessed;
}
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &al)
src/sendq.h
@@ -21,6 +21,7 @@
#include "defs.h"
#include "msg.h"
#include "timed_queue.h"
#include <boost/variant2/variant.hpp>
#include <deque>
#include <functional>
#include <list>
@@ -38,36 +39,43 @@
public:
    typedef std::string Remote;
    typedef bhome_msg::MsgI MsgI;
    typedef std::function<void(const MsgI &msg)> OnMsgEvent;
    typedef std::string Content;
    typedef boost::variant2::variant<MsgI, Content> Data;
    typedef std::function<void(const Data &)> OnMsgEvent;
    struct MsgInfo {
        MsgI msg_;
        Data data_;
        OnMsgEvent on_expire_;
        // OnMsgEvent on_send_;
    };
    typedef TimedData<MsgInfo> TimedMsg;
    typedef TimedMsg::TimePoint TimePoint;
    typedef TimedMsg::Duration Duration;
    void Append(const MQId &id, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
    template <class... Rest>
    void Append(const MQId &id, Rest &&...rest)
    {
        Append(std::string((const char *) &id, sizeof(id)), msg, onExpire);
        Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
    }
    void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
    {
        using namespace std::chrono_literals;
        Append(addr, msg, Now() + 60s, onExpire);
        msg.AddRef();
        AppendData(addr, Data(msg), DefaultExpire(), onExpire);
    }
    void Append(const Remote &addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
    {
        AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
    }
    bool TrySend(bhome_shm::ShmMsgQueue &mq);
    // bool empty() const { return store_.empty(); }
private:
    static TimePoint Now() { return TimedMsg::Clock::now(); }
    void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire)
    static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
    void AppendData(const Remote &addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
    {
        //TODO simple queue, organize later ?
        msg.AddRef();
        TimedMsg tmp(expire, MsgInfo{msg, onExpire});
        TimedMsg tmp(expire, MsgInfo{std::move(data), std::move(onExpire)});
        std::unique_lock<std::mutex> lock(mutex_in_);
        auto &al = in_[addr];
        if (!al.empty()) {
@@ -82,18 +90,6 @@
    int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr);
    int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &arr);
    class MsgIter
    {
        Array::iterator iter_;
    public:
        MsgIter(Array::iterator iter) :
            iter_(iter) {}
        MsgIter &operator++() { return ++iter_, *this; }
        bool operator==(const MsgIter &a) { return iter_ == a.iter_; }
        MsgI &operator*() { return iter_->data().msg_; }
    };
    std::mutex mutex_in_;
    std::mutex mutex_out_;
src/socket.h
@@ -36,11 +36,10 @@
class ShmSocket : private boost::noncopyable
{
    bool SendImpl(const void *valid_remote, MsgI const &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
    template <class... T>
    bool SendImpl(const void *valid_remote, T &&...rest)
    {
        // if (!mq().TrySend(*(MQId *) valid_remote, imsg)) {
        send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
        // }
        send_buffer_.Append(*static_cast<const MQId *>(valid_remote), std::forward<decltype(rest)>(rest)...);
        return true;
    }
@@ -69,24 +68,22 @@
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB())
    {
        MsgI msg;
        if (msg.Make(shm(), head, body)) {
            DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
            std::string msg_id(head.msg_id());
        try {
            if (!cb) {
                return SendImpl(valid_remote, msg);
                return SendImpl(valid_remote, MsgI::Serialize(head, body));
            } else {
                std::string msg_id(head.msg_id());
                per_msg_cbs_->Add(msg_id, cb);
                auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
                auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
                    RecvCB cb_no_use;
                    per_msg_cbs_->Find(msg_id, cb_no_use);
                };
                return SendImpl(valid_remote, msg, onExpireRemoveCB);
                return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB);
            }
        } else {
            SetLastError(ENOMEM, "Out of mem");
        } catch (...) {
            SetLastError(eError, "Send internal error.");
            return false;
        }
        return false;
    }
    bool Send(const void *valid_remote, const MsgI &imsg)
utest/api_test.cpp
@@ -138,6 +138,19 @@
    }
}
namespace
{
struct CCC {
};
void F(CCC &&c) {}
template <class... T>
void Pass(T &&...t)
{
    F(std::forward<decltype(t)>(t)...);
}
} // namespace
BOOST_AUTO_TEST_CASE(ApiTest)
{
    auto max_time = std::chrono::steady_clock::time_point::max();
@@ -241,7 +254,7 @@
        MsgStatus last;
        while (*run) {
            auto &st = Status();
            std::this_thread::sleep_for(1s);
            Sleep(1s, false);
            printf("nreq: %8ld, spd %8ld | failed: %8ld | nsrv: %8ld, spd %8ld | nreply: %8ld, spd %8ld\n",
                   st.nrequest_.load(), st.nrequest_ - last.nrequest_,
                   st.nfailed_.load(),
@@ -270,8 +283,8 @@
    int same = 0;
    int64_t last = 0;
    while (last < nreq * ncli && same < 3) {
        Sleep(1s);
    while (last < nreq * ncli && same < 2) {
        Sleep(1s, false);
        auto cur = Status().nreply_.load();
        if (last == cur) {
            ++same;