lichao
2021-05-18 e54b8e58780c7d9f37b06cc4e1dc88badb2129c9
remove sync recv, node cache msgs for sync recv.
7个文件已修改
239 ■■■■■ 已修改文件
src/msg.h 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 112 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h
@@ -209,6 +209,17 @@
        p += 4;
        return head.ParseFromArray(p, msg_size);
    }
    std::string body() const
    {
        auto p = get<char>();
        assert(p);
        uint32_t size = Get32(p);
        p += 4;
        p += size;
        size = Get32(p);
        p += 4;
        return std::string(p, size);
    }
    template <class Body>
    bool ParseBody(Body &body) const
    {
src/socket.cpp
@@ -139,25 +139,6 @@
    return false;
}
bool ShmSocket::SyncRecv(int64_t &cmd, const int timeout_ms)
{
    return (timeout_ms == 0) ? mq().TryRecv(cmd) : mq().Recv(cmd, timeout_ms);
}
//maybe reimplment, using async cbs?
bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms)
{
    // std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_.
    bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms);
    if (got) {
        if (msg.ParseHead(head)) {
            return true;
        } else {
            msg.Release();
        }
    }
    return false;
}
bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb)
{
    size_t size = content.size();
src/socket.h
@@ -99,8 +99,6 @@
    {
        return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
    }
    bool SyncRecv(int64_t &cmd, const int timeout_ms);
    bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
    template <class Body>
    bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
@@ -190,8 +188,8 @@
    Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
    Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
    SendQ send_buffer_;
    // node request center alloc memory.
    int node_proc_index_ = -1;
    int socket_index_ = -1;
src/topic_node.cpp
@@ -107,6 +107,14 @@
                    }
                    SetProcIndex(reply.proc_index());
                    this->state_ = eStateUnregistered;
                    auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
                        server_buffer_->Write(std::move(head), msg.body());
                    };
                    SockServer().Start(onRequest);
                    auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
                        sub_buffer_->Write(std::move(head), msg.body());
                    };
                    SockSub().Start(onSub);
                }
            } break;
            default: break;
@@ -341,26 +349,32 @@
bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker)
{
    auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
        if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
        MsgRequestTopic req;
        if (!imsg.ParseBody(req)) { return; }
    if (acb) {
        auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
            if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
            MsgRequestTopic req;
            if (!imsg.ParseBody(req)) { return; }
        try {
            SrcInfo *p = new SrcInfo;
            if (!p) {
                throw std::runtime_error("no memory.");
            try {
                SrcInfo *p = new SrcInfo;
                if (!p) {
                    throw std::runtime_error("no memory.");
                }
                p->route.assign(head.route().begin(), head.route().end());
                p->msg_id = head.msg_id();
                acb(p, *head.mutable_proc_id(), req);
            } catch (std::exception &e) {
                LOG_ERROR() << "error server handle msg:" << e.what();
            }
            p->route.assign(head.route().begin(), head.route().end());
            p->msg_id = head.msg_id();
            acb(p, *head.mutable_proc_id(), req);
        } catch (std::exception &e) {
            LOG_ERROR() << "error server handle msg:" << e.what();
        }
    };
        };
    auto &sock = SockServer();
    return acb && sock.Start(onRecv, nworker);
        return SockServer().Start(onRecv, nworker);
    } else {
        auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
            server_buffer_->Write(std::move(head), msg.body());
        };
        return SockServer().Start(onRequest, nworker);
    }
}
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
@@ -369,13 +383,19 @@
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
    auto &sock = SockServer();
    MsgI imsg;
    BHMsgHead head;
    if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
        if (imsg.ParseBody(request)) {
    std::string body;
    auto end_time = steady_clock::now() + milliseconds(timeout_ms);
    while (!server_buffer_->Read(head, body)) {
        if (steady_clock::now() < end_time) {
            robust::QuickSleep();
        } else {
            return false;
        }
    }
    if (head.type() == kMsgTypeRequestTopic) {
        if (request.ParseFromString(body)) {
            head.mutable_proc_id()->swap(proc_id);
            try {
                SrcInfo *p = new SrcInfo;
@@ -614,20 +634,24 @@
bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker)
{
    auto &sock = SockSub();
    auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
        if (head.type() == kMsgTypePublish) {
            MsgPublish pub;
            if (imsg.ParseBody(pub)) {
                tdcb(head.proc_id(), pub);
    if (tdcb) {
        auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
            if (head.type() == kMsgTypePublish) {
                MsgPublish pub;
                if (imsg.ParseBody(pub)) {
                    tdcb(head.proc_id(), pub);
                }
            } else {
                // ignored, or dropped
            }
        } else {
            // ignored, or dropped
        }
    };
    return tdcb && sock.Start(AsyncRecvProc, nworker);
        };
        return SockSub().Start(AsyncRecvProc, nworker);
    } else {
        auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
            sub_buffer_->Write(std::move(head), msg.body());
        };
        return SockSub().Start(onSub, nworker);
    }
}
bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
@@ -637,13 +661,19 @@
        return false;
    }
    auto &sock = SockSub();
    MsgI msg;
    DEFER1(msg.Release(););
    BHMsgHead head;
    std::string body;
    auto end_time = steady_clock::now() + milliseconds(timeout_ms);
    while (!sub_buffer_->Read(head, body)) {
        if (steady_clock::now() < end_time) {
            robust::QuickSleep();
        } else {
            return false;
        }
    }
    //TODO error msg.
    if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
        if (msg.ParseBody(pub)) {
    if (head.type() == kMsgTypePublish) {
        if (pub.ParseFromString(body)) {
            head.mutable_proc_id()->swap(proc_id);
            return true;
        }
src/topic_node.h
@@ -163,6 +163,32 @@
    int proc_index_ = -1;
    TopicQueryCache topic_query_cache_;
    class RecvQ
    {
    public:
        void Write(BHMsgHead &&head, std::string &&body) { q_.push_back({std::move(head), std::move(body)}); }
        bool Read(BHMsgHead &head, std::string &body)
        {
            if (q_.empty()) {
                return false;
            } else {
                head = std::move(q_.front().head);
                body = std::move(q_.front().body);
                q_.pop_front();
                return true;
            }
        }
    private:
        struct MsgData {
            BHMsgHead head;
            std::string body;
        };
        std::deque<MsgData> q_;
    };
    Synced<RecvQ> server_buffer_;
    Synced<RecvQ> sub_buffer_;
};
#endif // end of include guard: TOPIC_NODE_YVKWA6TF
utest/api_test.cpp
@@ -131,6 +131,15 @@
        reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
        if (reg) {
            printf("register ok\n");
            // bool r = BHUnregister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
            // printf("unregister %s\n", r ? "ok" : "failed");
            // reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
            // if (!reg) {
            //     int ec = 0;
            //     std::string msg;
            //     GetLastError(ec, msg);
            //     printf("reg error: %s\n", msg.c_str());
            // }
        } else {
            int ec = 0;
            std::string msg;
@@ -201,7 +210,7 @@
    auto SyncRequest = [&](int idx) { // SyncRequest
        MsgRequestTopic req;
        req.set_topic(topic_ + std::to_string(idx));
        req.set_topic(topic_ + std::to_string(0));
        req.set_data("request_data_" + std::to_string(idx));
        std::string s(req.SerializeAsString());
        // Sleep(10ms, false);
@@ -286,19 +295,30 @@
    std::atomic<bool> run(true);
    BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
    ThreadManager threads;
#if 0
    BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
#else
    BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc);
    threads.Launch(ServerLoop, &run);
#endif
    boost::timer::auto_cpu_timer timer;
    threads.Launch(hb, &run);
    threads.Launch(showStatus, &run);
    int ncli = 10;
    const int64_t nreq = 1000 * 100;
#if 1
    for (int i = 0; i < ncli; ++i) {
        threads.Launch(asyncRequest, nreq);
    }
    // for (int i = 0; i < 100; ++i) {
    //     SyncRequest(0);
    // }
#else
    for (int i = 0; i < 100; ++i) {
        SyncRequest(i);
    }
#endif
    int same = 0;
    uint64_t last = 0;
utest/speed_test.cpp
@@ -189,36 +189,24 @@
            Req();
        }
    };
    auto onRequest = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
        if (head.type() == kMsgTypeRequestTopic) {
            MQInfo src_mq = {head.route()[0].mq_id(), head.route()[0].abs_addr()};
    std::atomic<bool> stop(false);
    auto Server = [&]() {
        MsgI req;
        BHMsgHead req_head;
        while (!stop) {
            if (srv.SyncRecv(req, req_head, 10)) {
                DEFER1(req.Release());
                if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
                    MQInfo src_mq = {req_head.route()[0].mq_id(), req_head.route()[0].abs_addr()};
                    auto Reply = [&]() {
                        MsgRequestTopic reply_body;
                        reply_body.set_topic("topic");
                        reply_body.set_data(msg_content);
                        auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
                        return srv.Send(src_mq, reply_head, reply_body);
                    };
                    Reply();
                }
            }
            MsgRequestTopic reply_body;
            reply_body.set_topic("topic");
            reply_body.set_data(msg_content);
            auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), head.msg_id()));
            srv.Send(src_mq, reply_head, reply_body);
        }
    };
    srv.Start(onRequest);
    boost::timer::auto_cpu_timer timer;
    DEFER1(printf("Request Reply Test:"););
    ThreadManager clients, servers;
    for (int i = 0; i < 2; ++i) { servers.Launch(Server); }
    ThreadManager clients;
    printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
    for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
    clients.WaitAll();
@@ -227,7 +215,6 @@
        std::this_thread::sleep_for(100ms);
    } while (count.load() < ncli * nmsg);
    PrintStatus(NowSec());
    stop = true;
    servers.WaitAll();
    srv.Stop();
    // BOOST_CHECK_THROW(reply.Count(), int);
}