lichao
2021-04-14 fa95ddd1a294ebad47cabf9e149ee7d789271044
use 2 buf to speed up sendq; socket auto start.
8个文件已修改
87 ■■■■ 已修改文件
.vscode/launch.json 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.cpp 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.h 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/launch.json
@@ -11,7 +11,7 @@
            "program": "${workspaceFolder}/debug/bin/utest",
            "args": [
                "-t",
                "ApiTest"
                "SRTest"
            ],
            "stopAtEntry": false,
            "cwd": "${workspaceFolder}",
src/sendq.cpp
@@ -21,6 +21,11 @@
bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq)
{
    if (out_.empty()) {
        std::unique_lock<std::mutex> lock(mutex_);
        out_.swap(in_);
    }
    auto FirstNotExpired = [](MsgList &l) {
        auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
        return std::lower_bound(l.begin(), l.end(), Now(), Less);
@@ -36,27 +41,28 @@
            info.msg_.Release(mq.shm());
        }
        //TODO maybe use TrySendAll ?
        while (pos != msg_list.end() && mq.TrySend(*(MQId *) remote.data(), pos->data().msg_)) {
        int n = mq.TrySendAll(*(MQId *) remote.data(), MsgIter(pos), MsgIter(msg_list.end()));
        for (int i = 0; i < n; ++i) {
            auto &msg = pos->data().msg_;
            if (msg.IsCounted()) {
                msg.Release(mq.shm());
            }
            ++pos;
        }
        msg_list.erase(msg_list.begin(), pos);
    };
    if (!store_.empty()) {
        auto rec = store_.begin();
    if (!out_.empty()) {
        auto rec = out_.begin();
        do {
            SendOneRemote(rec->first, rec->second);
            if (rec->second.empty()) {
                rec = store_.erase(rec);
                rec = out_.erase(rec);
            } else {
                ++rec;
            }
        } while (rec != store_.end());
        } while (rec != out_.end());
    }
    return !store_.empty();
    return !out_.empty();
}
src/sendq.h
@@ -23,6 +23,7 @@
#include "timed_queue.h"
#include <deque>
#include <functional>
#include <mutex>
#include <string>
#include <unordered_map>
@@ -62,13 +63,29 @@
    static TimePoint Now() { return TimedMsg::Clock::now(); }
    void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire)
    {
        //TODO simple queue, organize later ?
        msg.AddRef();
        store_[addr].emplace_back(TimedMsg(expire, MsgInfo{msg, onExpire}));
        TimedMsg tmp(expire, MsgInfo{msg, onExpire});
        std::unique_lock<std::mutex> lock(mutex_);
        in_[addr].emplace_back(std::move(tmp));
    }
    typedef std::deque<TimedMsg> MsgList;
    typedef std::unordered_map<Remote, MsgList> Store;
    class MsgIter
    {
        MsgList::iterator iter_;
    Store store_;
    public:
        MsgIter(MsgList::iterator iter) :
            iter_(iter) {}
        MsgIter &operator++() { return ++iter_, *this; }
        bool operator==(const MsgIter &a) { return iter_ == a.iter_; }
        MsgI &operator*() { return iter_->data().msg_; }
    };
    std::mutex mutex_;
    Store in_;
    Store out_;
};
#endif // end of include guard: SENDQ_IWKMSK7M
src/shm_queue.cpp
@@ -69,9 +69,13 @@
    return Super::Remove(shm, MsgQIdToName(id));
}
ShmMsgQueue::Queue *ShmMsgQueue::FindRemote(SharedMemory &shm, const MQId &remote_id)
{
    return Find(shm, MsgQIdToName(remote_id));
}
bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
{
    Queue *remote = Find(shm, MsgQIdToName(remote_id));
    Queue *remote = FindRemote(shm, remote_id);
    if (remote) {
        if (onsend) {
            return remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
@@ -86,7 +90,7 @@
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend)
{
    Queue *remote = Find(shm, MsgQIdToName(remote_id));
    Queue *remote = FindRemote(shm, remote_id);
    if (remote) {
        if (onsend) {
            return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
src/shm_queue.h
@@ -174,14 +174,31 @@
    bool TryRecv(MsgI &msg) { return data()->TryRead(msg); }
    template <class OnData>
    int TryRecvAll(OnData const &onData) { return data()->TryReadAll(onData); }
    static Queue *FindRemote(SharedMemory &shm, const MQId &remote_id);
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend());
    static bool TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
    template <class Iter>
    static int TrySendAll(SharedMemory &shm, const MQId &remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
    {
        Queue *remote = FindRemote(shm, remote_id);
        if (remote) {
            if (onsend) {
                return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
            } else {
                return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); });
            }
        } else {
            // SetLestError(eNotFound);
            return 0;
        }
    }
    template <class... Rest>
    bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
    template <class... Rest>
    bool TrySend(const MQId &remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
    template <class... Rest>
    int TrySendAll(const MQId &remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
    size_t Pending() const { return data()->size(); }
};
src/socket.cpp
@@ -32,9 +32,13 @@
ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) :
    run_(false), mq_(id, shm, len)
{
    Start();
}
ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
    run_(false), mq_(shm, len) {}
    run_(false), mq_(shm, len)
{
    Start();
}
ShmSocket::~ShmSocket()
{
@@ -44,7 +48,7 @@
bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
{
    auto ioProc = [this, onData, onIdle]() {
        auto DoSend = [this]() { return send_buffer_->TrySend(mq()); };
        auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
        auto DoRecv = [=] {
            auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
                RecvCB cb;
src/socket.h
@@ -38,7 +38,9 @@
{
    bool SendImpl(const void *valid_remote, const MsgI &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
    {
        send_buffer_->Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
        // if (!mq().TrySend(*(MQId *) valid_remote, imsg)) {
        send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
        // }
        return true;
    }
@@ -170,7 +172,8 @@
    };
    Synced<AsyncCBs> per_msg_cbs_;
    Synced<SendQ> send_buffer_;
    SendQ send_buffer_;
    // Synced<SendQ> send_buffer_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
utest/speed_test.cpp
@@ -171,7 +171,7 @@
            }
            MsgI msg;
            BHMsgHead head;
            if (!cli.SyncRecv(msg, head, 100)) {
            if (!cli.SyncRecv(msg, head, 1000)) {
                printf("********** client recv error.\n");
            } else {
                DEFER1(msg.Release(shm));