lichao
2021-04-12 1b52f1cb8c47dd2c0195d2fd65d7b6a4c2f10704
add fail-resend support.
3个文件已添加
7个文件已修改
307 ■■■■ 已修改文件
.vscode/launch.json 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.cpp 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/failed_msg.cpp 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/failed_msg.h 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/timed_queue.h 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 48 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/launch.json
@@ -11,7 +11,7 @@
            "program": "${workspaceFolder}/debug/bin/utest",
            "args": [
                "-t",
                "ReqRepTest"
                "SRTest"
            ],
            "stopAtEntry": false,
            "cwd": "${workspaceFolder}",
src/center.cpp
@@ -18,6 +18,7 @@
#include "center.h"
#include "bh_util.h"
#include "defs.h"
#include "failed_msg.h"
#include "shm.h"
#include <chrono>
#include <set>
@@ -364,28 +365,31 @@
bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
{
    auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner);
    auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
    auto center_failed_q = std::make_shared<FailedMsgQ>();
    auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) {
        return [&](auto &&rep_body) {
            auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
            bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100);
            if (!r) {
                printf("send reply failed.\n");
            MsgI msg;
            if (msg.Make(socket.shm(), reply_head, rep_body)) {
                auto &remote = head.route(0).mq_id();
                bool r = socket.Send(remote.data(), msg, timeout_ms);
                if (!r) {
                    failq.Push(remote, msg, 60s); // for later retry.
                }
            }
            //TODO resend failed.
        };
    };
    auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
    auto OnCenterIdle = [center_ptr, center_failed_q](ShmSocket &socket) {
        auto &center = *center_ptr;
        center_failed_q->TrySend(socket);
        center->OnTimer();
    };
    auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
        auto &center = *center_ptr;
        auto replyer = MakeReplyer(socket, head, center->id());
        auto replyer = MakeReplyer(socket, head, center->id(), *center_failed_q);
        switch (head.type()) {
            CASE_ON_MSG_TYPE(Register);
            CASE_ON_MSG_TYPE(Heartbeat);
@@ -396,10 +400,11 @@
        }
    };
    auto OnBusIdle = [](ShmSocket &socket) {};
    auto bus_failed_q = std::make_shared<FailedMsgQ>();
    auto OnBusIdle = [=](ShmSocket &socket) { bus_failed_q->TrySend(socket); };
    auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
        auto &center = *center_ptr;
        auto replyer = MakeReplyer(socket, head, center->id());
        auto replyer = MakeReplyer(socket, head, center->id(), *bus_failed_q);
        auto OnPublish = [&]() {
            MsgPublish pub;
            NodeCenter::Clients clients;
@@ -407,19 +412,25 @@
            if (head.route_size() != 1 || !msg.ParseBody(pub)) {
                return;
            } else if (!center->FindClients(head, pub, clients, reply)) {
                MakeReplyer(socket, head, center->id())(reply);
                replyer(reply);
            } else {
                MakeReplyer(socket, head, center->id())(MakeReply(eSuccess));
                replyer(MakeReply(eSuccess));
                if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
                if (clients.empty()) { return; }
                for (auto &cli : clients) {
                auto it = clients.begin();
                do {
                    auto &cli = *it;
                    auto node = cli.weak_node_.lock();
                    if (node) {
                        if (!socket.Send(cli.mq_.data(), msg, 100)) {
                            printf("center route publish failed. need resend.\n");
                        if (!socket.Send(cli.mq_.data(), msg, 0)) {
                            bus_failed_q->Push(cli.mq_, msg, 60s);
                        }
                        ++it;
                    } else {
                        it = clients.erase(it);
                    }
                }
                } while (it != clients.end());
            }
        };
        switch (head.type()) {
@@ -484,7 +495,7 @@
{
    for (auto &kv : Centers()) {
        auto &info = kv.second;
        sockets_[info.name_]->Start(info.handler_);
        sockets_[info.name_]->Start(info.handler_, info.idle_);
    }
    return true;
src/failed_msg.cpp
New file
@@ -0,0 +1,33 @@
/*
 * =====================================================================================
 *
 *       Filename:  failed_msg.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月12日 16时10分53秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "failed_msg.h"
FailedMsgQ::Func FailedMsgQ::PrepareSender(const std::string &remote, Msg const &msg)
{
    msg.AddRef();
    return [remote, msg](void *valid_sock) {
        assert(valid_sock);
        ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock);
        bool r = sock.Send(remote.data(), msg, 0);
        if (r && msg.IsCounted()) {
            auto tmp = msg; // Release() is not const, but it's safe to release.
            tmp.Release(sock.shm());
        }
        return r;
    };
}
src/failed_msg.h
New file
@@ -0,0 +1,47 @@
/*
 * =====================================================================================
 *
 *       Filename:  failed_msg.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月12日 11时21分30秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef FAILED_MSG_9YOI86AS
#define FAILED_MSG_9YOI86AS
#include "msg.h"
#include "socket.h"
#include "timed_queue.h"
#include <string>
class FailedMsgQ
{
    typedef std::function<bool(void *)> Func;
    typedef TimedQueue<Func> TimedFuncQ;
public:
    typedef bhome_msg::MsgI Msg;
    void Push(const std::string &remote, Msg const &msg, TimedFuncQ::TimePoint const &exr) { queue_.Push(PrepareSender(remote, msg), exr); }
    void Push(const std::string &remote, Msg const &msg, TimedFuncQ::Duration const &exr) { queue_.Push(PrepareSender(remote, msg), exr); }
    void TrySend(ShmSocket &socket)
    {
        queue_.CheckAll([&](Func &f) { return f(&socket); });
    }
private:
    Func PrepareSender(const std::string &remote, Msg const &msg);
    TimedFuncQ queue_;
};
#endif // end of include guard: FAILED_MSG_9YOI86AS
src/socket.cpp
@@ -45,11 +45,12 @@
{
    auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
        RecvCB cb;
        if (async_cbs_->Find(head.msg_id(), cb)) {
        if (per_msg_cbs_->Find(head.msg_id(), cb)) {
            cb(socket, imsg, head);
        } else if (onData) {
            onData(socket, imsg, head);
        } // else ignored, or dropped
        } else { // else ignored, or dropped
        }
    };
    auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() {
@@ -61,7 +62,8 @@
                if (imsg.ParseHead(head)) {
                    onRecvWithPerMsgCB(*this, imsg, head);
                }
            } else if (onIdle) {
            }
            if (onIdle) {
                onIdle(*this);
            }
        } catch (...) {
src/socket.h
@@ -77,7 +77,7 @@
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb)
    {
        auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); };
        auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { per_msg_cbs_->Add(head.msg_id(), cb); }); };
        MsgI msg;
        return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
    }
@@ -109,12 +109,15 @@
                    reply.swap(msg);
                    reply_head.Swap(&head);
                    st->cv.notify_one();
                } else {
                } else { // ignore
                }
            };
            std::unique_lock<std::mutex> lk(st->mutex);
            bool sendok = Send(remote, head, body, timeout_ms, OnRecv);
            if (!sendok) {
                printf("send timeout\n");
            }
            if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
                return true;
            } else {
@@ -161,7 +164,7 @@
        }
    };
    Synced<AsyncCBs> async_cbs_;
    Synced<AsyncCBs> per_msg_cbs_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
src/timed_queue.h
New file
@@ -0,0 +1,75 @@
/*
 * =====================================================================================
 *
 *       Filename:  failed_msg.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月12日 09时36分04秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef TIMED_QUEUE_Y2YLRBS3
#define TIMED_QUEUE_Y2YLRBS3
#include "bh_util.h"
#include <chrono>
#include <list>
#include <string>
template <class Data, class ClockType = std::chrono::steady_clock>
class TimedQueue
{
public:
    typedef ClockType Clock;
    typedef typename Clock::time_point TimePoint;
    typedef typename Clock::duration Duration;
private:
    struct Record {
        TimePoint expire_;
        Data data_;
        Record(const TimePoint &expire, const Data &data) :
            expire_(expire), data_(data) {}
        Record(const TimePoint &expire, Data &&data) :
            expire_(expire), data_(std::move(data)) {}
        bool Expired() { return Clock::now() > expire_; }
    };
    typedef std::list<Record> Queue;
    Synced<Queue> queue_;
public:
    void Push(Data &&data, const TimePoint &expire) { queue_->emplace_back(expire, std::move(data)); }
    void Push(Data const &data, const TimePoint &expire) { queue_->emplace_back(expire, data); }
    void Push(Data &&data, Duration const &timeout) { Push(std::move(data), Clock::now() + timeout); }
    void Push(Data const &data, Duration const &timeout) { Push(data, Clock::now() + timeout); }
    template <class Func>
    void CheckAll(Func const &func)
    {
        queue_.Apply([&](Queue &q) {
            if (q.empty()) {
                return;
            }
            auto it = q.begin();
            do {
                if (it->Expired()) {
                    it = q.erase(it);
                } else if (func(it->data_)) {
                    it = q.erase(it);
                } else {
                    ++it;
                }
            } while (it != q.end());
        });
    }
};
#endif // end of include guard: TIMED_QUEUE_Y2YLRBS3
src/topic_node.cpp
@@ -17,6 +17,7 @@
 */
#include "topic_node.h"
#include "bh_util.h"
#include "failed_msg.h"
#include <chrono>
#include <list>
@@ -32,44 +33,7 @@
    std::string msg_id;
};
class ServerFailedQ
{
    struct FailedMsg {
        steady_clock::time_point xpr;
        std::string remote_;
        BHMsgHead head_;
        MsgRequestTopicReply body_;
        FailedMsg(const std::string &addr, BHMsgHead &&head, MsgRequestTopicReply &&body) :
            xpr(steady_clock::now() + 10s), remote_(addr), head_(std::move(head)), body_(std::move(body)) {}
        bool Expired() { return steady_clock::now() > xpr; }
    };
    typedef std::list<FailedMsg> Queue;
    Synced<Queue> queue_;
public:
    void Push(const std::string &remote, BHMsgHead &&head, MsgRequestTopicReply &&body)
    {
        queue_->emplace_back(remote, std::move(head), std::move(body));
    }
    void TrySend(ShmSocket &socket, const int timeout_ms = 0)
    {
        queue_.Apply([&](Queue &q) {
            if (!q.empty()) {
                auto it = q.begin();
                do {
                    if (it->Expired()) {
                        // it->msg_.Release(socket.shm());
                        it = q.erase(it);
                    } else if (socket.Send(it->remote_.data(), it->head_, it->body_, timeout_ms)) {
                        it = q.erase(it);
                    } else {
                        ++it;
                    }
                } while (it != q.end());
            }
        });
    }
};
typedef FailedMsgQ ServerFailedQ;
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
@@ -158,8 +122,12 @@
                    for (int i = 0; i < head.route_size() - 1; ++i) {
                        reply_head.add_route()->Swap(head.mutable_route(i));
                    }
                    if (!sock.Send(head.route().rbegin()->mq_id().data(), reply_head, reply_body, 10)) {
                        failed_q->Push(head.route().rbegin()->mq_id(), std::move(reply_head), std::move(reply_body));
                    MsgI msg;
                    if (msg.Make(sock.shm(), reply_head, reply_body)) {
                        auto &remote = head.route().rbegin()->mq_id();
                        if (!sock.Send(remote.data(), msg, 10)) {
                            failed_q->Push(remote, msg, 10s);
                        }
                    }
                }
            }
utest/speed_test.cpp
@@ -26,7 +26,7 @@
    ShmRemover auto_remove(shm_name);
    const int mem_size = 1024 * 1024 * 50;
    MQId id = boost::uuids::random_generator()();
    const int timeout = 100;
    const int timeout = 1000;
    const uint32_t data_size = 4000;
    const std::string proc_id = "demo_proc";
@@ -44,7 +44,6 @@
        DEFER1(msg.Release(shm););
        for (uint64_t i = 0; i < n; ++i) {
            // mq.Send(id, str.data(), str.size(), timeout);
            mq.Send(id, msg, timeout);
        }
    };
@@ -91,6 +90,7 @@
                    www.Launch(Writer, i, nmsg);
                }
                www.WaitAll();
                printf("writer finished\n");
                run.store(false);
                rrr.WaitAll();
                printf("Write %ld msg  R(%3d) W(%3d), : ", total_msg, nreader, nwriter);
@@ -136,14 +136,18 @@
    req_body.set_topic("topic");
    req_body.set_data(msg_content);
    auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
    req_head.add_route()->set_mq_id(&cli.id(), cli.id().size());
    request_rc.MakeRC(shm, req_head, req_body);
    DEFER1(request_rc.Release(shm));
    MsgRequestTopic reply_body;
    reply_body.set_topic("topic");
    reply_body.set_data(msg_content);
    auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id));
    reply_head.add_route()->set_mq_id(&srv.id(), srv.id().size());
    MsgI reply_rc;
    reply_rc.MakeRC(shm, reply_head, reply_body);
    DEFER1(reply_rc.Release(shm));
    std::atomic<uint64_t> count(0);
@@ -224,9 +228,5 @@
    printf("request ok: %ld\n", count.load());
    stop = true;
    servers.WaitAll();
    BOOST_CHECK(request_rc.IsCounted());
    BOOST_CHECK_EQUAL(request_rc.Count(), 1);
    request_rc.Release(shm);
    BOOST_CHECK(!request_rc.IsCounted());
    // BOOST_CHECK_THROW(reply.Count(), int);
}
utest/utest.cpp
@@ -1,5 +1,6 @@
#include "center.h"
#include "defs.h"
#include "failed_msg.h"
#include "util.h"
#include <atomic>
#include <boost/uuid/uuid_generators.hpp>
@@ -21,8 +22,28 @@
    static const bool value = true;
};
typedef FailedMsgQ ServerFailedQ;
BOOST_AUTO_TEST_CASE(Temp)
{
    const std::string shm_name("ShmTemp");
    ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
    SharedMemory shm(shm_name, 1024 * 1024 * 10);
    typedef std::chrono::steady_clock clock;
    int n = 1000 * 1000;
    std::vector<clock::time_point> tps(n);
    {
        printf("thread switch %d times, ", n);
        boost::timer::auto_cpu_timer timer;
        for (auto &tp : tps) {
            tp = clock::now();
            std::this_thread::yield();
        }
    }
    printf("time: %ld ns\n", (tps.back() - tps.front()).count());
    return;
    // sub topic partial match.
    Topic topics[] = {
        "",
        ".",
@@ -131,7 +152,9 @@
            bool r = provider.Publish(topic, data.data(), data.size(), timeout);
            if (!r) {
                printf("pub ret: %s\n", r ? "ok" : "fail");
                static std::atomic<int> an(0);
                int n = ++an;
                printf("pub %d ret: %s\n", n, r ? "ok" : "fail");
            }
        }
    };
@@ -142,6 +165,7 @@
        topics.push_back("t" + std::to_string(i));
    }
    Topics part;
    boost::timer::auto_cpu_timer pubsub_timer;
    for (size_t i = 0; i < topics.size(); ++i) {
        part.push_back(topics[i]);
        threads.Launch(Sub, i, topics);