zhangmeng
2022-12-13 92b04cda208baf628d9d166b3b330e35a6512b0c
robust
2个文件已修改
68 ■■■■ 已修改文件
cbhomeclient.cpp 60 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cbhomeclient.cpp
@@ -116,7 +116,7 @@
}
template <class F>
MsgCR to_topic(client* cli, F&& f, const struct cstrarr topic){
MsgCR to_topic(client* cli, F&& f, const struct cstrarr& topic){
    MsgCR msg(dummy());
    if (topic.arr && topic.count){
        MsgTopicList tlist;
@@ -165,42 +165,40 @@
    }
}
static void registered(client* cli, const creg* rinfo, const bool must_reg=true){
static void registered(client* cli, const creg* rinfo){
    if (must_reg){
        ProcInfo pinfo;
        auto tmp = rinfo->pinfo;
        pinfo.set_name(tmp->name.str, tmp->name.size);
        pinfo.set_proc_id(tmp->id.str, tmp->id.size);
        const auto& reg = pinfo.SerializeAsString();
        while (!cli->thrd_quit.load(memory_order_acquire)) {
            void* replymsg = NULL;
            int replysize = 0;
            cli->bus = bus_register(reg.data(), reg.size(), &replymsg, &replysize, sndto);
            bus_free(replymsg, replysize);
            if (cli->bus) break;
        }
        // register success start read request thread
        cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); }));
    ProcInfo pinfo;
    auto tmp = rinfo->pinfo;
    pinfo.set_name(tmp->name.str, tmp->name.size);
    pinfo.set_proc_id(tmp->id.str, tmp->id.size);
    const auto& reg = pinfo.SerializeAsString();
    while (!cli->thrd_quit.load(memory_order_acquire)) {
        void* replymsg = NULL;
        int replysize = 0;
        cli->bus = bus_register(reg.data(), reg.size(), &replymsg, &replysize, sndto);
        bus_free(replymsg, replysize);
        if (cli->bus) break;
    }
    // register success start read request thread
    cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); }));
    // request/reply和pub topic一起处理
    auto tmparr = cstr_arr_new(rinfo->channel.count + rinfo->topic_pub.count);
    auto addarr = [&tmparr](size_t& start, const struct cstrarr* arr){
        for(size_t i = 0; i < arr->count; i++){
            cstr_arr_add(&tmparr, arr->arr[i].str, arr->arr[i].size, start+i);
        }
        start += arr->count;
    // 只需要将字符串指针拷贝就行,不需创建字符串内存
    auto shallowMerge = [](const cstrarr& arr1, const cstrarr& arr2){
        auto tmp = cstr_arr_new(arr1.count + arr2.count);
        auto dst = tmp.arr;
        auto cp2dst = [&dst](const cstr* src, const size_t cnt){
            memcpy(dst, src, cnt * sizeof(cstr));
            dst += cnt;
        };
        cp2dst(arr1.arr, arr1.count);
        cp2dst(arr2.arr, arr2.count);
        return tmp;
    };
    size_t s = 0;
    addarr(s, &rinfo->channel);
    addarr(s, &rinfo->topic_pub);
    auto tmparr = shallowMerge(rinfo->channel, rinfo->topic_pub);
    auto tpcmsg = to_topic(cli, bus_register_topics, tmparr);
    cstr_arr_free(tmparr);
    // auto channelmsg = to_topic(cli, bus_register_topics, rinfo->channel);
    // auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub);
    free(tmparr.arr);
    // if topic pub/sub[net] exist, register topics
    auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub);
    auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net);
main.cpp
@@ -118,10 +118,10 @@
    memset(&reg, 0, sizeof(reg));
    reg.pinfo = make_proc("reply", "replyid");
    reg.channel = cstr_arr_new(1);
    size_t i = 0;
    for(; i < 1; i++){
        cstr_arr_add(&reg.channel, topic, strlen(topic), i);
    }
    cstr_arr_add(&reg.channel, topic, strlen(topic), 0);
    reg.topic_pub = cstr_arr_new(1);
    cstr_arr_add(&reg.topic_pub, topic, strlen(topic), 0);
    void* handle = bus_client_init(NULL, 0, &reg);
    size_t count = 0;
    this_thread::sleep_for(chrono::seconds(3));