lichao
2021-04-25 5b6ced44157b6e7fab519ae48f5cffcdc2b3cd7c
use node mqid ssn id to index online nodes.
7个文件已修改
87 ■■■■■ 已修改文件
box/center.cpp 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proto.cpp 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proto.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -52,6 +52,11 @@
    struct ProcState {
        int64_t timestamp_ = 0;
        uint32_t flag_ = 0; // reserved
        void PutOffline(const int64_t offline_time)
        {
            timestamp_ = NowSec() - offline_time;
            flag_ = kStateOffline;
        }
        void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
        {
            auto diff = now - timestamp_;
@@ -106,6 +111,10 @@
        }
        try {
            MQId ssn = head.ssn_id();
            // use src_addr as session id.
            // when node restart, src_addr will change,
            // and old node will be removed after timeout.
            auto UpdateRegInfo = [&](Node &node) {
                node->addrs_.insert(SrcAddr(head));
                for (auto &addr : msg.addrs()) {
@@ -116,19 +125,24 @@
                node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
            };
            auto pos = nodes_.find(head.proc_id());
            if (pos != nodes_.end()) { // new client
            auto pos = nodes_.find(ssn);
            if (pos != nodes_.end()) { // update
                Node &node = pos->second;
                if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) {
                    // node restarted, release old mq.
                    RemoveNode(node);
                    node.reset(new NodeInfo);
                }
                UpdateRegInfo(node);
            } else {
                Node node(new NodeInfo);
                UpdateRegInfo(node);
                nodes_[node->proc_.proc_id()] = node;
                nodes_[ssn] = node;
                auto old = node_addr_map_.find(head.proc_id());
                if (old != node_addr_map_.end()) { // old session
                    auto &old_ssn = old->second;
                    nodes_[old_ssn]->state_.PutOffline(offline_time_);
                    printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
                    old_ssn = ssn;
                } else {
                    node_addr_map_.emplace(head.proc_id(), ssn);
                }
            }
            return MakeReply(eSuccess);
        } catch (...) {
@@ -140,7 +154,7 @@
    Reply HandleMsg(const BHMsgHead &head, Func const &op)
    {
        try {
            auto pos = nodes_.find(head.proc_id());
            auto pos = nodes_.find(head.ssn_id());
            if (pos == nodes_.end()) {
                return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
            } else {
@@ -171,9 +185,7 @@
        return HandleMsg(
            head, [&](Node node) -> MsgCommonReply {
                NodeInfo &ni = *node;
                auto now = NowSec(); // just set to offline.
                ni.state_.timestamp_ = now - offline_time_;
                ni.state_.UpdateState(now, offline_time_, kill_time_);
                ni.state_.PutOffline(offline_time_);
                return MakeReply(eSuccess);
            });
    }
@@ -375,6 +387,7 @@
        };
        EraseMapRec(service_map_, node->services_);
        EraseMapRec(subscribe_map_, node->subscriptions_);
        node_addr_map_.erase(node->proc_.proc_id());
        for (auto &addr : node->addrs_) {
            cleaner_(addr);
@@ -385,7 +398,8 @@
    std::unordered_map<Topic, Clients> service_map_;
    std::unordered_map<Topic, Clients> subscribe_map_;
    std::unordered_map<ProcId, Node> nodes_;
    std::unordered_map<Address, Node> nodes_;
    std::unordered_map<std::string, Address> node_addr_map_;
    Cleaner cleaner_; // remove mqs.
    int64_t offline_time_;
    int64_t kill_time_;
@@ -425,7 +439,7 @@
    auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2);
    auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
        return [&](auto &&rep_body) {
            auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
            auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
            auto remote = head.route(0).mq_id();
            socket.Send(remote, reply_head, rep_body);
        };
proto/source/bhome_msg.proto
@@ -16,8 +16,9 @@
    repeated BHAddress route = 2; // for reply and proxy.
    int64 timestamp = 3;
    int32 type = 4;
    bytes proc_id = 5;
    bytes topic = 6; // for request route
    uint64 ssn_id = 5; // node mq id
    bytes proc_id = 6;
    bytes topic = 7; // for request route
}
src/proto.cpp
@@ -32,17 +32,18 @@
std::string NewMsgId() { return RandId(); }
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id)
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id)
{
    return InitMsgHead(type, proc_id, RandId());
    return InitMsgHead(type, proc_id, ssn_id, RandId());
}
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid)
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id, const std::string &msgid)
{
    BHMsgHead msg;
    msg.set_msg_id(msgid);
    msg.set_type(type);
    msg.set_proc_id(proc_id);
    msg.set_ssn_id(ssn_id);
    msg.set_timestamp(NowSec());
    return msg;
}
src/proto.h
@@ -74,8 +74,8 @@
    return msg;
}
std::string NewMsgId();
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid);
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id);
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id, const std::string &msgid);
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id);
// inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
inline bool IsSuccess(const ErrorCode ec) { return ec == eSuccess; }
bool IsMsgExpired(const BHMsgHead &head);
src/topic_node.cpp
@@ -89,7 +89,7 @@
    AddId(SockSub().id());
    AddId(SockPub().id());
    auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
    auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
    AddRoute(head, sock.id());
    auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
@@ -129,7 +129,7 @@
    MsgUnregister body;
    body.mutable_proc()->Swap(&proc);
    auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
    auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
    AddRoute(head, sock.id());
    auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
@@ -165,7 +165,7 @@
    MsgHeartbeat body;
    body.mutable_proc()->Swap(&proc);
    auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
    auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
    AddRoute(head, sock.id());
    if (timeout_ms == 0) {
@@ -195,7 +195,7 @@
    }
    auto &sock = SockNode();
    BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
    BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn()));
    AddRoute(head, sock.id());
    MsgI reply;
@@ -217,7 +217,7 @@
    MsgRegisterRPC body;
    body.mutable_topics()->Swap(&topics);
    auto head(InitMsgHead(GetType(body), proc_id()));
    auto head(InitMsgHead(GetType(body), proc_id(), ssn()));
    AddRoute(head, sock.id());
    if (timeout_ms == 0) {
@@ -242,7 +242,7 @@
        MsgRequestTopicReply reply_body;
        if (rcb(head.proc_id(), req, reply_body)) {
            BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id()));
            BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), ssn(), head.msg_id()));
            for (int i = 0; i < head.route_size() - 1; ++i) {
                reply_head.add_route()->Swap(head.mutable_route(i));
@@ -311,7 +311,7 @@
    if (!p || p->route.empty()) {
        return false;
    }
    BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id));
    BHMsgHead head(InitMsgHead(GetType(body), proc_id(), ssn(), p->msg_id));
    for (unsigned i = 0; i < p->route.size() - 1; ++i) {
        head.add_route()->Swap(&p->route[i]);
    }
@@ -348,7 +348,7 @@
    auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) {
        auto &sock = SockClient();
        BHMsgHead head(InitMsgHead(GetType(req), proc_id(), msg_id));
        BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id));
        AddRoute(head, sock.id());
        head.set_topic(req.topic());
@@ -388,7 +388,7 @@
        BHAddress addr;
        if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
            BHMsgHead head(InitMsgHead(GetType(request), proc_id()));
            BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
            AddRoute(head, sock.id());
            head.set_topic(request.topic());
@@ -460,7 +460,7 @@
    try {
        auto &sock = SockPub();
        BHMsgHead head(InitMsgHead(GetType(pub), proc_id()));
        BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn()));
        AddRoute(head, sock.id());
        if (timeout_ms == 0) {
@@ -494,7 +494,7 @@
        MsgSubscribe sub;
        sub.mutable_topics()->Swap(&topics);
        BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
        BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
        AddRoute(head, sock.id());
        if (timeout_ms == 0) {
            return sock.Send(BHTopicBusAddress(), head, sub);
src/topic_node.h
@@ -74,6 +74,7 @@
    void Stop();
private:
    MQId ssn() { return SockNode().id(); }
    bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms);
    typedef MsgQueryTopicReply::BHNodeAddress NodeAddress;
    int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms);
utest/speed_test.cpp
@@ -38,7 +38,7 @@
        MsgRequestTopic body;
        body.set_topic("topic");
        body.set_data(str);
        auto head(InitMsgHead(GetType(body), proc_id));
        auto head(InitMsgHead(GetType(body), proc_id, mq.Id()));
        msg.Make(head, body);
        assert(msg.valid());
        DEFER1(msg.Release(););
@@ -156,7 +156,7 @@
                MsgRequestTopic req_body;
                req_body.set_topic("topic");
                req_body.set_data(msg_content);
                auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
                auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id()));
                req_head.add_route()->set_mq_id(cli.id());
                return cli.Send(srv.id(), req_head, req_body);
            };
@@ -180,7 +180,7 @@
                        MsgRequestTopic reply_body;
                        reply_body.set_topic("topic");
                        reply_body.set_data(msg_content);
                        auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
                        auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
                        return srv.Send(src_id, reply_head, reply_body);
                    };
                    Reply();