zhangmeng
2022-12-27 cf0a3209b51babf72469d962914db0dac2e5f52c
add get msg timeout
4个文件已修改
97 ■■■■ 已修改文件
cbhomeclient.cpp 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cbhomeclient.h 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
exported_symbols 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fixed_q.h 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cbhomeclient.cpp
@@ -82,7 +82,7 @@
Msg to_bus(client* cli, F&& f, Args&&... args){
    Msg mesg;
    if (std::forward<F>(f)(cli->bus, std::forward<Args>(args)...))
        mesg = std::move(msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...))));
        mesg = msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...)));
    return mesg;
}
@@ -96,7 +96,7 @@
        tie(d, s) = tp;
        if (m.ParseFromArray(d, s)) {
            bus_free(d, s);
            msg = std::move(make_tuple(true, std::move(m)));
            msg = make_tuple(true, std::move(m));
            break;
        }
        m.Clear();
@@ -110,8 +110,8 @@
template <size_t... Is, class F, class... Args>
MsgCR to_center(client* cli, F&& f, Args&&... args){
    MsgCR msg(dummy());
    auto vmsg = std::move(to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...));
    if (!vmsg.empty()) msg = std::move(parse(cli, vmsg.at(0)));
    auto vmsg = to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...);
    if (!vmsg.empty()) msg = parse(cli, vmsg.at(0));
    return msg;
}
@@ -124,8 +124,8 @@
    const auto& tpc = tlist.SerializeAsString();
    void* replymsg = NULL;
    int replysize = 0;
    msg = std::move(to_center<2,3>(cli, std::forward<F>(f),
        tpc.data(), tpc.size(), &replymsg, &replysize, sndto));
    msg = to_center<2,3>(cli, std::forward<F>(f),
        tpc.data(), tpc.size(), &replymsg, &replysize, sndto);
    return msg;
}
@@ -248,11 +248,7 @@
    delete cli;
}
struct csubmsg* bus_client_get_submsg(void* handle){
    client* cli = ptr(handle);
    Msg msg = std::move(cli->sub_q->pop());
    if (msg.empty()) return NULL;
static struct csubmsg* parse_submsg(const Msg& msg){
    void* procid = NULL, *data = NULL;
    int pids = 0, size = 0;
@@ -265,12 +261,23 @@
    return pmsg;
}
struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms){
    client* cli = ptr(handle);
    Msg msg = std::move(cli->readreq_q->pop());
    Msg msg = cli->sub_q->pop(ms);
    if (msg.empty()) return NULL;
    return parse_submsg(msg);
}
struct csubmsg* bus_client_get_submsg(void* handle){
    client* cli = ptr(handle);
    Msg msg = cli->sub_q->pop();
    if (msg.empty()) return NULL;
    return parse_submsg(msg);
}
static struct creqmsg* parse_reqmsg(const Msg& msg, void** src){
    void* procid = NULL, *data = NULL;
    int pids = 0, size = 0;
@@ -284,14 +291,28 @@
    return pmsg;
}
struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms){
    client* cli = ptr(handle);
    Msg msg = cli->readreq_q->pop(ms);
    if (msg.empty()) return NULL;
    return parse_reqmsg(msg, src);
}
struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
    client* cli = ptr(handle);
    Msg msg = cli->readreq_q->pop();
    if (msg.empty()) return NULL;
    return parse_reqmsg(msg, src);
}
int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){
    void* procid = NULL, *reply = NULL;
    int pids = 0, replys = 0;
    auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
    auto vmsg = to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
                    msg->msg, msg->msgl, &procid, &pids,
                    &reply, &replys, sndto));
                    &reply, &replys, sndto);
    if (!vmsg.empty()){
        void* procid = NULL, *data = NULL;
        int pids = 0, size = 0;
cbhomeclient.h
@@ -24,11 +24,15 @@
void bus_client_free(void* handle);
/*
    获取订阅的消息,订阅消息通过线程不停读取,此处从缓存中读取
    获取订阅的消息,订阅消息通过线程不停读取,此处从缓存中读取,读不到不返回
    必须通过 message.h 的 free_reqmsg 释放
    通过 get_submsg_db get_submsg_proclist 获取对应的消息
*/
struct csubmsg* bus_client_get_submsg(void* handle);
/*
    与 bus_client_get_submsg 相同,但支持超时返回,ms 参数表明超时时长,单位毫秒
*/
struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms);
/*
    发布消息,data 是 MsgPublish protobuffer序列化后的数据
*/
@@ -39,7 +43,7 @@
int bus_client_publish(void* handle, const char* topic, const size_t topicl, const char* data, const size_t size);
/*
    获取 request 消息,通过线程读取,此处从缓存中读取
    获取 request 消息,通过线程读取,此处从缓存中读取, 读不到不返回
    必须调用 free_reqmsg 释放
    可通过 message.h 的 get_reqmsg_stackerr get_reqmsg_stack 获取对应的消息
    src 是哪一个进程请求的标识符
@@ -48,6 +52,10 @@
*/
struct creqmsg* bus_client_get_reqmsg(void* handle, void** src);
/*
    与 bus_client_get_reqmsg 相同,但支持超时返回,ms 参数表明超时时长,单位毫秒
*/
struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms);
/*
    响应消息回复,src是连接标识符,msg是需要回复的消息
    通过 message.h 的 make_reply_msg 创建时,有内存拷贝,必须通过 free_reply_msg 释放
    或者通过填充 crepmsg 结构体构造,由调用者控制变量的内存和生命周期,可能不会拷贝内存,效率更高
exported_symbols
@@ -3,9 +3,11 @@
    bus_client_init;
    bus_client_free;
    bus_client_get_submsg;
    bus_client_get_submsg_intime;
    bus_client_pubmsg;
    bus_client_publish;
    bus_client_get_reqmsg;
    bus_client_get_reqmsg_intime;
    bus_client_reply_msg;
    bus_client_request;
fixed_q.h
@@ -46,6 +46,17 @@
        cond_node.notify();
        return q.size();
    }
    int pop(T& m, const size_t ms){
        std::unique_lock<std::mutex> l{mtx_q};
        auto d = std::chrono::milliseconds{ms};
        if(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){
            return -1;
        }
        m = q.front();
        q.pop_front();
        cond_spare.notify();
        return q.size();
    }
    int pop(T& m){
        std::unique_lock<std::mutex> l{mtx_q};
        auto d = std::chrono::milliseconds{du};
@@ -57,16 +68,29 @@
        cond_spare.notify();
        return q.size();
    }
    T pop(const size_t ms){
        std::unique_lock<std::mutex> l{mtx_q};
        auto d = std::chrono::milliseconds{ms};
        T t{};
        if(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){
            return t;
        }
        t = q.front();
        q.pop_front();
        cond_spare.notify();
        return t;
    }
    T pop(){
        std::unique_lock<std::mutex> l{mtx_q};
        auto d = std::chrono::milliseconds{du};
        T t{};
        while(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){
            if (pred()) return T{};
            if (pred()) return t;
        }
        auto m(move(q.front()));
        t = q.front();
        q.pop_front();
        cond_spare.notify();
        return m;
        return t;
    }
    template<typename F> void clear(F&& f){