zhangmeng
2023-01-29 5fb901dd157b4a8bbfc3be916c6c1a9d02b5bfa6
cbhomeclient.cpp
@@ -9,7 +9,8 @@
#include "cbhomeclient.h"
#include "fixed_q.h"
#include "3dparty/bus_nng/interface_bus_api.h"
#include "3rdparty/bus_nng/interface_bus_api.h"
#include "3rdparty/bus_nng/bn_api.h"
#include "bhome_msg.pb.h"
@@ -43,7 +44,7 @@
    void*                       bus{nullptr};
    cproc*                      pinfo{nullptr};
    ~client(){
        free_proc_info(pinfo);
        internal_cproc_free(pinfo);
        thrd_quit.store(true, memory_order_acq_rel);
        if (thrd_sub) thrd_sub->join();
@@ -52,7 +53,7 @@
        if (sub_q) sub_q->clear(freeMsg);
        if (readreq_q) readreq_q->clear(freeMsg);
        if (bus) bus_cleanup(bus);
        bus_cleanup(bus);
    }
};
@@ -74,7 +75,7 @@
}
template <class T> Msg msg(T&& t){
    Msg m;
    msg_helper(make_index_sequence<tuple_size<T>::value>{}, std::forward<T>(t), m);
    msg_helper(make_index_sequence<tuple_size<typename decay<T>::type>::value>{}, std::forward<T>(t), m);
    return m;
}
@@ -82,7 +83,7 @@
Msg to_bus(client* cli, F&& f, Args&&... args){
    Msg mesg;
    if (std::forward<F>(f)(cli->bus, std::forward<Args>(args)...))
        mesg = std::move(msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...))));
        mesg = msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...)));
    return mesg;
}
@@ -96,7 +97,7 @@
        tie(d, s) = tp;
        if (m.ParseFromArray(d, s)) {
            bus_free(d, s);
            msg = std::move(make_tuple(true, std::move(m)));
            msg = make_tuple(true, std::move(m));
            break;
        }
        m.Clear();
@@ -110,24 +111,22 @@
template <size_t... Is, class F, class... Args>
MsgCR to_center(client* cli, F&& f, Args&&... args){
    MsgCR msg(dummy());
    auto vmsg = std::move(to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...));
    if (!vmsg.empty()) msg = std::move(parse(cli, vmsg.at(0)));
    auto vmsg = to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...);
    if (!vmsg.empty()) msg = parse(cli, vmsg.at(0));
    return msg;
}
template <class F>
MsgCR to_topic(client* cli, F&& f, const struct cstrarr& topic){
MsgCR to_topic(client* cli, F&& f, char** topic, const size_t count){
    MsgCR msg(dummy());
    if (topic.arr && topic.count){
        MsgTopicList tlist;
        for(size_t i = 0; i < topic.count; i++)
            tlist.add_topic_list(topic.arr[i].str, topic.arr[i].size);
        const auto& tpc = tlist.SerializeAsString();
        void* replymsg = NULL;
        int replysize = 0;
        msg = std::move(to_center<2,3>(cli, std::forward<F>(f),
            tpc.data(), tpc.size(), &replymsg, &replysize, sndto));
    }
    MsgTopicList tlist;
    for(size_t i = 0; i < count; i++)
        tlist.add_topic_list(topic[i]);
    const auto& tpc = tlist.SerializeAsString();
    void* replymsg = NULL;
    int replysize = 0;
    msg = to_center<2,3>(cli, std::forward<F>(f),
        tpc.data(), tpc.size(), &replymsg, &replysize, sndto);
    return msg;
}
@@ -168,14 +167,14 @@
static void registered(client* cli, const creg* rinfo){
    ProcInfo pinfo;
    auto tmp = rinfo->pinfo;
    pinfo.set_name(tmp->name.str, tmp->name.size);
    pinfo.set_proc_id(tmp->id.str, tmp->id.size);
    const auto& reg = pinfo.SerializeAsString();
    auto proc = creg_proc(rinfo);
    pinfo.set_name(cproc_name(proc));
    pinfo.set_proc_id(cproc_id(proc));
    const auto& pbproc = pinfo.SerializeAsString();
    while (!cli->thrd_quit.load(memory_order_acquire)) {
        void* replymsg = NULL;
        int replysize = 0;
        cli->bus = bus_register(reg.data(), reg.size(), &replymsg, &replysize, sndto);
        cli->bus = bus_register(pbproc.data(), pbproc.size(), &replymsg, &replysize, sndto);
        bus_free(replymsg, replysize);
        if (cli->bus) break;
    }
@@ -184,24 +183,32 @@
    // request/reply和pub topic一起处理
    // 只需要将字符串指针拷贝就行,不需创建字符串内存
    auto shallowMerge = [](const cstrarr& arr1, const cstrarr& arr2){
        auto tmp = cstr_arr_new(arr1.count + arr2.count);
        auto dst = tmp.arr;
        auto cp2dst = [&dst](const cstr* src, const size_t cnt){
            memcpy(dst, src, cnt * sizeof(cstr));
    auto shallowMerge = [](char** a1, const size_t c1, char** a2, const size_t c2){
        auto tmp = (char**)malloc((c1 + c2) * sizeof(char*));
        auto dst = tmp;
        auto cp2dst = [&dst](char** src, const size_t cnt){
            memcpy(dst, src, cnt * sizeof(char*));
            dst += cnt;
        };
        cp2dst(arr1.arr, arr1.count);
        cp2dst(arr2.arr, arr2.count);
        cp2dst(a1, c1);
        cp2dst(a2, c2);
        return tmp;
    };
    auto tmparr = shallowMerge(rinfo->channel, rinfo->topic_pub);
    auto tpcmsg = to_topic(cli, bus_register_topics, tmparr);
    free(tmparr.arr);
    size_t tpcc = 0, pubc = 0;
    char** tpc = creg_reply_topic(rinfo, &tpcc);
    char** pub = creg_pub_topic(rinfo, &pubc);
    auto tmparr = shallowMerge(tpc, tpcc, pub, pubc);
    auto tpcmsg = to_topic(cli, bus_register_topics, tmparr, tpcc + pubc);
    for(size_t i = 0; i < tpcc+pubc; i++){
        printf("======>> proc [%s] topic %lu -> %s\n", cproc_name(proc), i, tmparr[i]);
    }
    free(tmparr);
    // if topic pub/sub[net] exist, register topics
    auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub);
    auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net);
    tpc = creg_sub_topic(rinfo, &tpcc);
    auto submsg = to_topic(cli, bus_subscribe_topics, tpc, tpcc);
    tpc = creg_subnet_topic(rinfo, &tpcc);
    auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, tpc, tpcc);
    if (get<0>(submsg) && !cli->thrd_sub)
        cli->thrd_sub.reset(new thread([cli]{ thread_sub(cli); }));
@@ -210,21 +217,21 @@
static void unregistered(client* cli){
    ProcInfo pinfo;
    auto tmp = cli->pinfo;
    pinfo.set_name(tmp->name.str, tmp->name.size);
    pinfo.set_proc_id(tmp->id.str, tmp->id.size);
    const auto& reg = pinfo.SerializeAsString();
    auto proc = cli->pinfo;
    pinfo.set_name(cproc_name(proc));
    pinfo.set_proc_id(cproc_id(proc));
    const auto& pbproc = pinfo.SerializeAsString();
    void* rep;
    int repl;
    to_center<2,3>(cli, bus_unregister, reg.data(), reg.size(), &rep, &repl, sndto);
    to_center<2,3>(cli, bus_unregister, pbproc.data(), pbproc.size(), &rep, &repl, sndto);
}
static inline client* ptr(void* handle){ return static_cast<client*>(handle); }
void* bus_client_init(const char* srvid, const size_t sidsize, const creg* rinfo){
    client* cli = new client;
    cli->pinfo = clone_proc_info(rinfo->pinfo);
    cli->pinfo = internal_clone_cproc(creg_proc(rinfo));
    auto pred = [cli]{ return cli->thrd_quit.load(memory_order_relaxed); };
    const size_t qsize = 5;
@@ -242,11 +249,7 @@
    delete cli;
}
struct csubmsg* bus_client_get_submsg(void* handle){
    client* cli = ptr(handle);
    Msg msg = std::move(cli->sub_q->pop());
    if (msg.empty()) return NULL;
static struct csubmsg* parse_submsg(const Msg& msg){
    void* procid = NULL, *data = NULL;
    int pids = 0, size = 0;
@@ -259,12 +262,23 @@
    return pmsg;
}
struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms){
    client* cli = ptr(handle);
    Msg msg = std::move(cli->readreq_q->pop());
    Msg msg = cli->sub_q->pop(ms);
    if (msg.empty()) return NULL;
    return parse_submsg(msg);
}
struct csubmsg* bus_client_get_submsg(void* handle){
    client* cli = ptr(handle);
    Msg msg = cli->sub_q->pop();
    if (msg.empty()) return NULL;
    return parse_submsg(msg);
}
static struct creqmsg* parse_reqmsg(const Msg& msg, void** src){
    void* procid = NULL, *data = NULL;
    int pids = 0, size = 0;
@@ -278,14 +292,28 @@
    return pmsg;
}
struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms){
    client* cli = ptr(handle);
    Msg msg = cli->readreq_q->pop(ms);
    if (msg.empty()) return NULL;
    return parse_reqmsg(msg, src);
}
struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
    client* cli = ptr(handle);
    Msg msg = cli->readreq_q->pop();
    if (msg.empty()) return NULL;
    return parse_reqmsg(msg, src);
}
int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){
    void* procid = NULL, *reply = NULL;
    int pids = 0, replys = 0;
    auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
                    msg->msg.str, msg->msg.size, &procid, &pids,
                    &reply, &replys, sndto));
    auto vmsg = to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
                    msg->msg, msg->msgl, &procid, &pids,
                    &reply, &replys, sndto);
    if (!vmsg.empty()){
        void* procid = NULL, *data = NULL;
        int pids = 0, size = 0;
@@ -305,18 +333,54 @@
    return false;
}
int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg){
int bus_client_reply(void* handle, void* src, const struct crepmsg* msg){
    MsgRequestTopicReply msgR;
    auto err = msgR.mutable_errmsg();
    err->set_errcode((ErrorCode)msg->errcode);
    err->set_errstring(msg->errmsg.str, msg->errmsg.size);
    err->set_errstring(msg->errmsg, msg->errmsgl);
    msgR.set_data(msg->data.str, msg->data.size);
    msgR.set_data(msg->data, msg->datal);
    auto pbstr = msgR.SerializeAsString();
    auto cli = ptr(handle);
    return bus_send_reply(cli->bus, src, pbstr.data(), pbstr.size());
}
struct cqueryprocs* bus_client_query_procs(void* handle, size_t* count){
    BHAddress addr;
    const auto& pbaddr = addr.SerializeAsString();
    MsgQueryProc topic;
    const auto& pbtopic = topic.SerializeAsString();
    void* rep = NULL;
    int repl = 0;
    auto msg = to_bus<4,5>(ptr(handle), bus_query_procs, pbaddr.data(), pbaddr.size(),
       pbtopic.data(), pbtopic.size(), &rep, &repl, sndto);
    if (msg.empty()) return NULL;
    // bug 已修复, 在 bus_nng 中
    // bus_query_procs 获取数据失败, 暂时改用BHQueryProcs
    // if (!BHQueryProcs(pbaddr.data(), pbaddr.size(), pbtopic.data(), pbtopic.size(), &rep, &repl, sndto)) {
    //     return NULL;
    // }
    MsgQueryProcReply msgR;
    msgR.ParseFromArray(rep, repl);
    bus_free(rep, repl);
    *count = msgR.proc_list_size();
    auto procs = (struct cqueryprocs*)calloc(*count, sizeof(struct cqueryprocs));
    for(size_t i = 0; i < *count; i++){
        const auto& p = msgR.proc_list(i);
        size_t idl = p.proc().proc_id().size();
        char* id = (char*)calloc(idl+1, 1);
        memcpy(id, p.proc().proc_id().data(), idl);
        procs[i] = cqueryprocs{ .id = id, .idl = idl, .online = p.online() };
    }
    return procs;
}
////////////////////////////////////////////////////
@@ -325,11 +389,11 @@
    pbmsg.set_topic(topic, topicl);
    pbmsg.set_data(data, size);
    auto pbstr = pbmsg.SerializeAsString();
    return bus_client_pubmsg(handle, pbstr.data(), pbstr.size());
    return bus_client_publish_msg(handle, pbstr.data(), pbstr.size());
}
// test
int bus_client_pubmsg(void* handle, const char* data, const size_t size){
int bus_client_publish_msg(void* handle, const char* data, const size_t size){
    client* cli = ptr(handle);
    return bus_publish(cli->bus, data, size, 100);
}