lichao
2021-04-13 65ef4d68321e56906920be75831b5e968f7abd7b
add heartbeat; refactor.
7个文件已修改
140 ■■■■ 已修改文件
.vscode/launch.json 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.cpp 40 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/failed_msg.cpp 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 43 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/launch.json
@@ -11,7 +11,7 @@
            "program": "${workspaceFolder}/debug/bin/utest",
            "args": [
                "-t",
                "SRTest"
                "HeartbeatTest"
            ],
            "stopAtEntry": false,
            "cwd": "${workspaceFolder}",
src/center.cpp
@@ -34,7 +34,9 @@
namespace
{
typedef steady_clock::time_point TimePoint;
typedef steady_clock::duration Duration;
inline TimePoint Now() { return steady_clock::now(); };
inline int64_t Seconds(const Duration &d) { return duration_cast<seconds>(d).count(); };
//TODO check proc_id
class NodeCenter
@@ -56,15 +58,15 @@
    struct ProcState {
        TimePoint timestamp_;
        uint32_t flag_ = 0; // reserved
        void UpdateState(TimePoint now)
        void UpdateState(TimePoint now, const Duration &offline_time, const Duration &kill_time)
        {
            const auto kOfflineTime = 60 * 10s;
            const auto kKillTime = 60 * 20s;
            auto diff = now - timestamp_;
            if (diff < kOfflineTime) {
#ifndef NDEBUG
            printf("diff: %ld\n", Seconds(diff));
#endif
            if (diff < offline_time) {
                flag_ = kStateNormal;
            } else if (diff < kKillTime) {
            } else if (diff < kill_time) {
                flag_ = kStateOffline;
            } else {
                flag_ = kStateKillme;
@@ -94,8 +96,8 @@
public:
    typedef std::set<TopicDest> Clients;
    NodeCenter(const std::string &id, const Cleaner &cleaner) :
        id_(id), cleaner_(cleaner) {}
    NodeCenter(const std::string &id, const Cleaner &cleaner, const Duration &offline_time, const Duration &kill_time) :
        id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(Now()) {}
    const std::string &id() const { return id_; } // no need to lock.
    //TODO maybe just return serialized string.
@@ -132,6 +134,8 @@
                auto node = pos->second;
                if (!MatchAddr(node->addrs_, SrcAddr(head))) {
                    return MakeReply<Reply>(eAddressNotMatch, "Node address error.");
                } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) {
                    return op(node);
                } else if (!Valid(*node)) {
                    return MakeReply<Reply>(eNoRespond, "Node is not alive.");
                } else {
@@ -168,7 +172,9 @@
    {
        return HandleMsg(head, [&](Node node) {
            NodeInfo &ni = *node;
            ni.state_.timestamp_ = Now();
            auto now = Now();
            ni.state_.timestamp_ = now;
            ni.state_.flag_ = kStateNormal;
            auto &info = msg.proc();
            if (!info.public_info().empty()) {
@@ -301,10 +307,15 @@
private:
    void CheckNodes()
    {
        auto now = Now();
        if (Seconds(now - last_check_time_) < 1) { return; }
        last_check_time_ = now;
        auto it = nodes_.begin();
        while (it != nodes_.end()) {
            auto &cli = *it->second;
            cli.state_.UpdateState(Now());
            cli.state_.UpdateState(now, offline_time_, kill_time_);
            if (cli.state_.flag_ == kStateKillme) {
                if (cleaner_) {
                    for (auto &addr : cli.addrs_) {
@@ -316,6 +327,10 @@
                ++it;
            }
        }
    }
    bool CanHeartbeat(const NodeInfo &node)
    {
        return Valid(node) || node.state_.flag_ == kStateOffline;
    }
    bool Valid(const NodeInfo &node)
    {
@@ -333,6 +348,9 @@
    std::unordered_map<Topic, Clients> subscribe_map_;
    std::unordered_map<ProcId, Node> nodes_;
    Cleaner cleaner_; // remove mqs.
    Duration offline_time_;
    Duration kill_time_;
    TimePoint last_check_time_;
};
template <class Body, class OnMsg, class Replyer>
@@ -365,7 +383,7 @@
bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
{
    auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner);
    auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 3);
    auto center_failed_q = std::make_shared<FailedMsgQ>();
    auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) {
        return [&](auto &&rep_body) {
src/failed_msg.cpp
@@ -24,6 +24,7 @@
        assert(valid_sock);
        ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock);
        bool r = sock.Send(remote.data(), msg, 0);
        //TODO check remote removed.
        if (r && msg.IsCounted()) {
            auto tmp = msg; // Release() is not const, but it's safe to release.
            tmp.Release(sock.shm());
src/shm_queue.cpp
@@ -72,12 +72,22 @@
bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
{
    Queue *remote = Find(shm, MsgQIdToName(remote_id));
    return remote && remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
    if (remote) {
        return remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
    } else {
        // SetLestError(eNotFound);
        return false;
    }
}
bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
{
    Queue *remote = Find(shm, MsgQIdToName(remote_id));
    return remote && remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
    if (remote) {
        return remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
    } else {
        // SetLestError(eNotFound);
        return false;
    }
}
// Test shows that in the 2 cases:
src/topic_node.cpp
@@ -39,17 +39,21 @@
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
{
    SockNode().Start();
    SockClient().Start();
    SockServer().Start();
    Start();
}
TopicNode::~TopicNode()
{
    StopAll();
    Stop();
}
void TopicNode::StopAll()
void TopicNode::Start()
{
    SockNode().Start();
    SockClient().Start();
    SockServer().Start();
}
void TopicNode::Stop()
{
    SockServer().Stop();
    SockClient().Stop();
@@ -76,12 +80,39 @@
    BHMsgHead reply_head;
    bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
    r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
    if (r) {
    if (r && IsSuccess(reply_body.errmsg().errcode())) {
        info_ = body;
    }
    return r;
}
bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
    auto &sock = SockNode();
    MsgHeartbeat body;
    *body.mutable_proc() = proc;
    auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
    AddRoute(head, sock.id());
    MsgI reply;
    DEFER1(reply.Release(shm_););
    BHMsgHead reply_head;
    bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
    r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
    if (r && IsSuccess(reply_body.errmsg().errcode())) {
        // TODO update proc info
    }
    return r;
}
bool TopicNode::Heartbeat(const int timeout_ms)
{
    ProcInfo proc;
    proc.set_proc_id(proc_id());
    MsgCommonReply reply_body;
    return Heartbeat(proc, reply_body, timeout_ms) && IsSuccess(reply_body.errmsg().errcode());
}
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
    //TODO check registered
src/topic_node.h
@@ -37,9 +37,12 @@
    TopicNode(SharedMemory &shm);
    ~TopicNode();
    void StopAll();
    void Start();
    void Stop();
    // topic node
    bool Register(ProcInfo &body, MsgCommonReply &reply, const int timeout_ms);
    bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Heartbeat(const int timeout_ms);
    // topic rpc server
    typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
utest/utest.cpp
@@ -240,7 +240,7 @@
        do {
            std::this_thread::yield();
        } while (count.load() < nreq);
        client.StopAll();
        client.Stop();
        printf("request %s %d done ", topic.c_str(), count.load());
    };
@@ -282,6 +282,37 @@
    servers.WaitAll();
}
BOOST_AUTO_TEST_CASE(HeartbeatTest)
{
    const std::string shm_name("ShmHeartbeat");
    ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024 * 1024 * 50);
    BHCenter center(shm);
    center.Start();
    {
        DemoNode node("demo_node", shm);
        auto Check = [&]() {
            bool r = node.Heartbeat(100);
            printf("hearbeat ret : %s\n", r ? "ok" : "failed");
        };
        Check();
        for (int i = 0; i < 3; ++i) {
            std::this_thread::sleep_for(1s);
            Check();
        }
        printf("sleep 4\n");
        std::this_thread::sleep_for(4s);
        for (int i = 0; i < 2; ++i) {
            std::this_thread::sleep_for(1s);
            Check();
        }
    }
    printf("sleep 8\n");
    std::this_thread::sleep_for(8s);
}
inline int MyMin(int a, int b)
{
    printf("MyMin\n");