zhangmeng
2022-12-14 f600bb176c1d2f0eeb5180637bdc09605b3d21bd
restruct for easy use v1.1
5个文件已修改
740 ■■■■ 已修改文件
cbhomeclient.cpp 72 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cbhomeclient.h 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.cpp 87 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
message.cpp 394 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
message.h 175 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cbhomeclient.cpp
@@ -43,7 +43,7 @@
    void*                       bus{nullptr};
    cproc*                      pinfo{nullptr};
    ~client(){
        free_proc_info(pinfo);
        internal_cproc_free(pinfo);
        thrd_quit.store(true, memory_order_acq_rel);
        if (thrd_sub) thrd_sub->join();
@@ -52,7 +52,7 @@
        if (sub_q) sub_q->clear(freeMsg);
        if (readreq_q) readreq_q->clear(freeMsg);
        if (bus) bus_cleanup(bus);
        bus_cleanup(bus);
    }
};
@@ -116,18 +116,16 @@
}
template <class F>
MsgCR to_topic(client* cli, F&& f, const struct cstrarr& topic){
MsgCR to_topic(client* cli, F&& f, char** topic, const size_t count){
    MsgCR msg(dummy());
    if (topic.arr && topic.count){
        MsgTopicList tlist;
        for(size_t i = 0; i < topic.count; i++)
            tlist.add_topic_list(topic.arr[i].str, topic.arr[i].size);
    for(size_t i = 0; i < count; i++)
        tlist.add_topic_list(topic[i]);
        const auto& tpc = tlist.SerializeAsString();
        void* replymsg = NULL;
        int replysize = 0;
        msg = std::move(to_center<2,3>(cli, std::forward<F>(f),
            tpc.data(), tpc.size(), &replymsg, &replysize, sndto));
    }
    return msg;
}
@@ -168,14 +166,14 @@
static void registered(client* cli, const creg* rinfo){
    ProcInfo pinfo;
    auto tmp = rinfo->pinfo;
    pinfo.set_name(tmp->name.str, tmp->name.size);
    pinfo.set_proc_id(tmp->id.str, tmp->id.size);
    const auto& reg = pinfo.SerializeAsString();
    auto proc = creg_proc(rinfo);
    pinfo.set_name(cproc_name(proc));
    pinfo.set_proc_id(cproc_id(proc));
    const auto& pbproc = pinfo.SerializeAsString();
    while (!cli->thrd_quit.load(memory_order_acquire)) {
        void* replymsg = NULL;
        int replysize = 0;
        cli->bus = bus_register(reg.data(), reg.size(), &replymsg, &replysize, sndto);
        cli->bus = bus_register(pbproc.data(), pbproc.size(), &replymsg, &replysize, sndto);
        bus_free(replymsg, replysize);
        if (cli->bus) break;
    }
@@ -184,24 +182,32 @@
    // request/reply和pub topic一起处理
    // 只需要将字符串指针拷贝就行,不需创建字符串内存
    auto shallowMerge = [](const cstrarr& arr1, const cstrarr& arr2){
        auto tmp = cstr_arr_new(arr1.count + arr2.count);
        auto dst = tmp.arr;
        auto cp2dst = [&dst](const cstr* src, const size_t cnt){
            memcpy(dst, src, cnt * sizeof(cstr));
    auto shallowMerge = [](char** a1, const size_t c1, char** a2, const size_t c2){
        auto tmp = (char**)malloc((c1 + c2) * sizeof(char*));
        auto dst = tmp;
        auto cp2dst = [&dst](char** src, const size_t cnt){
            memcpy(dst, src, cnt * sizeof(char*));
            dst += cnt;
        };
        cp2dst(arr1.arr, arr1.count);
        cp2dst(arr2.arr, arr2.count);
        cp2dst(a1, c1);
        cp2dst(a2, c2);
        return tmp;
    };
    auto tmparr = shallowMerge(rinfo->channel, rinfo->topic_pub);
    auto tpcmsg = to_topic(cli, bus_register_topics, tmparr);
    free(tmparr.arr);
    size_t tpcc = 0, pubc = 0;
    char** tpc = creg_rep_topic(rinfo, &tpcc);
    char** pub = creg_pub_topic(rinfo, &pubc);
    auto tmparr = shallowMerge(tpc, tpcc, pub, pubc);
    auto tpcmsg = to_topic(cli, bus_register_topics, tmparr, tpcc + pubc);
    for(size_t i = 0; i < tpcc+pubc; i++){
        printf("======>> proc [%s] topic %lu -> %s\n", cproc_name(proc), i, tmparr[i]);
    }
    free(tmparr);
    // if topic pub/sub[net] exist, register topics
    auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub);
    auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net);
    tpc = creg_sub_topic(rinfo, &tpcc);
    auto submsg = to_topic(cli, bus_subscribe_topics, tpc, tpcc);
    tpc = creg_subnet_topic(rinfo, &tpcc);
    auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, tpc, tpcc);
    if (get<0>(submsg) && !cli->thrd_sub)
        cli->thrd_sub.reset(new thread([cli]{ thread_sub(cli); }));
@@ -210,21 +216,21 @@
static void unregistered(client* cli){
    ProcInfo pinfo;
    auto tmp = cli->pinfo;
    pinfo.set_name(tmp->name.str, tmp->name.size);
    pinfo.set_proc_id(tmp->id.str, tmp->id.size);
    const auto& reg = pinfo.SerializeAsString();
    auto proc = cli->pinfo;
    pinfo.set_name(cproc_name(proc));
    pinfo.set_proc_id(cproc_id(proc));
    const auto& pbproc = pinfo.SerializeAsString();
    void* rep;
    int repl;
    to_center<2,3>(cli, bus_unregister, reg.data(), reg.size(), &rep, &repl, sndto);
    to_center<2,3>(cli, bus_unregister, pbproc.data(), pbproc.size(), &rep, &repl, sndto);
}
static inline client* ptr(void* handle){ return static_cast<client*>(handle); }
void* bus_client_init(const char* srvid, const size_t sidsize, const creg* rinfo){
    client* cli = new client;
    cli->pinfo = clone_proc_info(rinfo->pinfo);
    cli->pinfo = internal_clone_cproc(creg_proc(rinfo));
    auto pred = [cli]{ return cli->thrd_quit.load(memory_order_relaxed); };
    const size_t qsize = 5;
@@ -284,7 +290,7 @@
    void* procid = NULL, *reply = NULL;
    int pids = 0, replys = 0;
    auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
                    msg->msg.str, msg->msg.size, &procid, &pids,
                    msg->msg, msg->msgl, &procid, &pids,
                    &reply, &replys, sndto));
    if (!vmsg.empty()){
        void* procid = NULL, *data = NULL;
@@ -310,9 +316,9 @@
    MsgRequestTopicReply msgR;
    auto err = msgR.mutable_errmsg();
    err->set_errcode((ErrorCode)msg->errcode);
    err->set_errstring(msg->errmsg.str, msg->errmsg.size);
    err->set_errstring(msg->errmsg, msg->errmsgl);
    msgR.set_data(msg->data.str, msg->data.size);
    msgR.set_data(msg->data, msg->datal);
    auto pbstr = msgR.SerializeAsString();
    auto cli = ptr(handle);
cbhomeclient.h
@@ -25,7 +25,8 @@
/*
    获取订阅的消息,订阅消息通过线程不停读取,此处从缓存中读取
    可通过 message.h 对应的 get_submsg_db get_submsg_proclist 获取对应的消息
    必须通过 message.h 的 free_reqmsg 释放
    通过 get_submsg_db get_submsg_proclist 获取对应的消息
*/
struct csubmsg* bus_client_get_submsg(void* handle);
/*
@@ -39,6 +40,7 @@
/*
    获取 request 消息,通过线程读取,此处从缓存中读取
    必须调用 free_reqmsg 释放
    可通过 message.h 的 get_reqmsg_stackerr get_reqmsg_stack 获取对应的消息
    src 是哪一个进程请求的标识符
    可以响应多个request发送的消息,同时需要满足异步响应,使用 src 进行区分
@@ -47,13 +49,15 @@
struct creqmsg* bus_client_get_reqmsg(void* handle, void** src);
/*
    响应消息回复,src是连接标识符,msg是需要回复的消息
    通过 message.h 的 make_reply_msg 创建
    通过 message.h 的 make_reply_msg 创建时,有内存拷贝,必须通过 free_reply_msg 释放
    或者通过填充 crepmsg 结构体构造,由调用者控制变量的内存和生命周期,可能不会拷贝内存,效率更高
*/
int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg);
/*
    同步的request请求,发送 creqmsg 获取 crepmsg 回复
    通过 message.h 的 make_req_msg 创建 request 消息
    获取的 crepmsg 回复消息,需要使用 free_reply_msg 释放
    通过 message.h 的 make_req_msg 创建 request 消息时,必须调用 free_reqmsg 释放
    或者通过填充 creqmsg 结构体构造,由调用者控制变量的内存和生命周期,可能不会拷贝内存,效率更高
    获取的 crepmsg 回复消息,必须使用 free_reply_msg 释放
*/
int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg);
main.cpp
@@ -13,33 +13,20 @@
#include "bhome_msg_api.pb.h"
using namespace bhome_msg;
static cproc* make_proc(const char* name, const char* id){
    cproc* pinfo = (cproc*)calloc(1,sizeof(cproc));
    auto assign = [](char** d, size_t* l, const char* tmp){
        *l = strlen(tmp);
        *d = (char*)malloc(*l);
        memcpy(*d, tmp, *l);
    };
    assign(&pinfo->name.str, &pinfo->name.size, name);
    assign(&pinfo->id.str, &pinfo->id.size, id);
    return pinfo;
}
template <class F> void ignoref(F&& f){}
static void pub(const vector<string>& topics){
    ignoref(pub);
    creg reg;
    memset(&reg, 0, sizeof(reg));
    reg.pinfo = make_proc("pub", "pubid");
    reg.topic_pub = cstr_arr_new(topics.size());
    size_t i = 0;
    for(; i < topics.size(); i++){
        cstr_arr_add(&reg.topic_pub, topics.at(0).data(), topics.at(0).size(), i);
    }
    void* handle = bus_client_init(NULL, 0, &reg);
    vector<const char*> pubtpc;
    for(auto& t : topics) pubtpc.push_back(t.c_str());
    creg* reg = make_creg(make_cproc("pub", "pubid"),
        NULL, 0, &pubtpc[0], pubtpc.size(), NULL, 0, NULL, 0);
    void* handle = bus_client_init(NULL, 0, reg);
    creg_free(reg);
    size_t count = 0;
    string base_msg("test_pub_sub==");
    this_thread::sleep_for(chrono::seconds(3));
@@ -62,21 +49,18 @@
static void sub(const vector<string>& topics){
    ignoref(sub);
    creg reg;
    memset(&reg, 0, sizeof(reg));
    reg.pinfo = make_proc("sub", "subid");
    vector<const char*> pubtpc;
    for(auto& t : topics) pubtpc.push_back(t.c_str());
    reg.topic_sub = cstr_arr_new(topics.size());
    size_t i = 0;
    for(; i < topics.size(); i++){
        cstr_arr_add(&reg.topic_sub, topics.at(0).data(), topics.at(0).size(), i);
    }
    creg* reg = make_creg(make_cproc("sub", "subid"),
        NULL, 0, NULL, 0, &pubtpc[0], pubtpc.size(), NULL, 0);
    void* handle = bus_client_init(NULL, 0, &reg);
    void* handle = bus_client_init(NULL, 0, reg);
    creg_free(reg);
    while (true) {
        auto msg = bus_client_get_submsg(handle);
        printf("SUB msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str);
        printf("SUB msg topic [%s] data [%s]\n", msg->topic, msg->msg);
    }
    bus_client_free(handle);
@@ -85,25 +69,22 @@
static void req(const char* topic){
    ignoref(req);
    string strtpc(topic);
    creg reg;
    memset(&reg, 0, sizeof(reg));
    reg.pinfo = make_proc("request", "requestid");
    // reg.channel = cstr_arr_new(1);
    // size_t i = 0;
    // for(; i < 1; i++){
    //     cstr_arr_add(&reg.topic_pub, topic, strlen(topic), i);
    // }
    void* handle = bus_client_init(NULL, 0, &reg);
    const auto topicl = strlen(topic);
    creg* reg = make_creg(make_cproc("request", "requestid"),
        NULL, 0, NULL, 0, NULL, 0, NULL, 0);
    void* handle = bus_client_init(NULL, 0, reg);
    creg_free(reg);
    size_t count = 0;
    string base_msg("test_request==");
    this_thread::sleep_for(chrono::seconds(3));
    while (true) {
        auto msg = base_msg + "request message -> msg-"+to_string(count++);
        auto reqmsg = make_req_msg(strtpc.data(), strtpc.size(), msg.data(), msg.size());
        auto reqmsg = make_req_msg(topic, topicl, msg.data(), msg.size());
        crepmsg* repmsg = NULL;
        if (bus_client_request(handle, reqmsg, &repmsg)){
            printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data.str);
            printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data);
        }
        free_reqmsg(reqmsg);
        free_reply_msg(repmsg);
@@ -114,15 +95,15 @@
static void reply(const char* topic){
    ignoref(reply);
    creg reg;
    memset(&reg, 0, sizeof(reg));
    reg.pinfo = make_proc("reply", "replyid");
    reg.channel = cstr_arr_new(1);
    cstr_arr_add(&reg.channel, topic, strlen(topic), 0);
    reg.topic_pub = cstr_arr_new(1);
    cstr_arr_add(&reg.topic_pub, topic, strlen(topic), 0);
    const auto topicl = strlen(topic);
    vector<const char*> pubtpc{topic};
    void* handle = bus_client_init(NULL, 0, &reg);
    creg* reg = make_creg(make_cproc("reply", "replyid"),
        &pubtpc[0], pubtpc.size(), &pubtpc[0], pubtpc.size(), NULL, 0, NULL, 0);
    void* handle = bus_client_init(NULL, 0, reg);
    creg_free(reg);
    size_t count = 0;
    this_thread::sleep_for(chrono::seconds(3));
    while (true) {
@@ -131,7 +112,7 @@
        auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12);
        bus_client_reply_msg(handle, src, repmsg);
        free_reply_msg(repmsg);
        printf("REPREQ msg [%s] \n", msg->msg.str);
        printf("REPREQ msg [%s] \n", msg->msg);
        free_reqmsg(msg);
        this_thread::sleep_for(chrono::seconds{2});
message.cpp
@@ -20,91 +20,140 @@
using namespace std;
struct cstr{
    char*  str;         // 字符串内容
    size_t size;        // 字符串长度
};
// 进程信息
struct cproc{
    struct cstr name;   // 进程名字
    struct cstr id;     // 进程id
    struct cstr info;   // 进程其他信息,目前没用
};
struct creg{
    cproc*  proc;                    // 需要注册的进程信息
    char** channel;             // 进程提供的请求响应服务的主题
    size_t channel_count;
    char** pub;                 // 进程提供的发布订阅的发布主题
    size_t pub_count;
    char** sub;                 // 进程需要订阅的主题
    size_t sub_count;
    char** sub_net;             // 进程需要订阅的网络主题,目前没用
    size_t sub_net_count;
};
template <class T> T* ptrT(const size_t l=1){ return (T*)calloc(l, sizeof(T)); }
static inline struct cstr null_cstr(){
template<class T> struct rmC : remove_const<T>{};
template<class T> struct rmC<const T*>{ using type = typename rmC<T>::type*; };
template<class T> struct rmC<const T**>{ using type = typename rmC<T>::type**; };
template <class T, typename enable_if<is_pointer<T>::value>::type* = nullptr>
typename rmC<T>::type rmConst(T t){ return const_cast<typename rmC<T>::type>(t); }
static inline struct cstr cstr_ref(const char* str, const size_t len){
    struct cstr cs;
    memset(&cs, 0, sizeof(cs));
    return cs;
}
static inline struct cstr cstr_ref(char* str, const size_t len){
    struct cstr cs = null_cstr();
    cs.size = len;
    cs.str = str;
    cs.str = rmConst(str);
    return cs;
}
static inline struct cstr cstr_clone(const struct cstr old){
    return cstr_new(old.str, old.size);
}
struct cstr cstr_new(const char* str, const size_t len){
    struct cstr cs = null_cstr();
    cs.size = len;
    cs.str = ptrT<char>(len);
    memcpy(cs.str, str, len);
    return cs;
}
void cstr_free(struct cstr str){
    if (str.str && str.size) free(str.str);
}
static inline struct cstrarr null_cstr_arr(){
    struct cstrarr arr;
    memset(&arr, 0, sizeof(arr));
    return arr;
}
struct cstrarr cstr_arr_new(const size_t count){
    struct cstrarr arr = null_cstr_arr();
    arr.arr = (struct cstr*)calloc(count, sizeof(struct cstr));
    arr.count = count;
    return arr;
}
void cstr_arr_add(struct cstrarr* arr, const char* data, const size_t len, const size_t idx){
    if (arr->arr && arr->count && idx < arr->count){
        arr->arr[idx] = cstr_new(data, len);
    }
}
void cstr_arr_free(struct cstrarr arr){
    for(size_t i = 0; i < arr.count; i++){
        auto & str = arr.arr[i];
        cstr_free(str);
    }
    free(arr.arr);
}
struct cproc* make_proc_info(const struct cstr name, const struct cstr id, const struct cstr info){
static inline struct cstr cstr_clone(const struct cstr* str){
    struct cstr cs;
    cs.size = str->size;
    cs.str = ptrT<char>(cs.size);
    memcpy(cs.str, str, cs.size);
    return cs;
}
cproc* internal_clone_cproc(const cproc* proc){
    auto nproc = ptrT<struct cproc>();
    nproc->name = cstr_clone(&proc->name);
    nproc->id = cstr_clone(&proc->id);
    return nproc;
}
void internal_cproc_free(cproc* proc){
    if (proc){
        free(proc->name.str);
        free(proc->id.str);
        free(proc->info.str);
        free(proc);
    }
}
cproc* make_cproc(const char* name, const char* id){
    auto proc = ptrT<struct cproc>();
    proc->name = cstr_clone(name);
    proc->id = cstr_clone(id);
    // proc->info = cstr_clone(info);
    proc->name = cstr_ref(name, strlen(name));
    proc->id = cstr_ref(id, strlen(id));
    return proc;
}
struct cproc* clone_proc_info(const struct cproc* pi){
    if (!pi) return NULL;
    auto newpi = ptrT<struct cproc>();
    newpi->name = cstr_clone(newpi->name);
    newpi->id = cstr_clone(newpi->id);
    // todo: ignore info
    return newpi;
char* cproc_name(const cproc* proc){
    if (!proc) return NULL;
    return proc->name.str;
}
void free_proc_info(struct cproc* pi){
    if (pi){
        cstr_free(pi->name);
        cstr_free(pi->id);
        cstr_free(pi->info);
        free(pi);
    }
char* cproc_id(const cproc* proc){
    if (!proc) return NULL;
    return proc->id.str;
}
void free_creg(struct creg* reg){
    if (reg){
        if (reg->pinfo) free_proc_info(reg->pinfo);
        cstr_arr_free(reg->channel);
        cstr_arr_free(reg->topic_pub);
        cstr_arr_free(reg->topic_sub);
        cstr_arr_free(reg->topic_sub_net);
void cproc_free(cproc* proc){
    free(proc);
}
creg* make_creg(const cproc* proc, const char** rep, const size_t repcnt,
                                    const char** pub, const size_t pubcnt,
                                    const char** sub, const size_t subcnt,
                                    const char** subnet, const size_t subnetcnt)
{
    auto reg = ptrT<struct creg>();
    reg->proc = rmConst(proc);
    reg->channel = rmConst(rep);
    reg->channel_count = repcnt;
    reg->pub = rmConst(pub);
    reg->pub_count = pubcnt;
    reg->sub = rmConst(sub);
    reg->sub_count = subcnt;
    reg->sub_net = rmConst(subnet);
    reg->sub_net_count = subnetcnt;
    return reg;
}
const cproc* creg_proc(const creg* reg){
    if (!reg) return NULL;
    return reg->proc;
}
char** creg_rep_topic(const creg* reg, size_t* count){
    if (!reg) return NULL;
    *count = reg->channel_count;
    return reg->channel;
}
char** creg_pub_topic(const creg* reg, size_t* count){
    if (!reg) return NULL;
    *count = reg->pub_count;
    return reg->pub;
}
char** creg_sub_topic(const creg* reg, size_t* count){
    if (!reg) return NULL;
    *count = reg->sub_count;
    return reg->sub;
}
char** creg_subnet_topic(const creg* reg, size_t* count){
    if (!reg) return NULL;
    *count = reg->sub_net_count;
    return reg->sub_net;
}
void creg_free(creg* reg){
        free(reg);
    }
}
/////////////////////////////////////////////////////////////////////
@@ -154,28 +203,21 @@
    };
    msg->table = (TableChanged)yyjson_get_int(obj("table"));
    msg->action = (DbAction)yyjson_get_int(obj("action"));
    assign("id", &msg->id.str, &msg->id.size);
    assign("info", &msg->info.str, &msg->info.size);
    assign("id", &msg->id, &msg->idl);
    assign("info", &msg->info, &msg->infol);
    yyjson_doc_free(doc);
    return msg;
}
static inline void freeDbChangeMsg(struct DbChangeMsg* msg){
    if (msg){
        cstr_free(msg->id);
        cstr_free(msg->info);
        free(msg->id);
        free(msg->info);
        free(msg);
    }
}
///////////////////////////////////////////////////////////////
static inline tuple<char*, size_t> parseTopic(MsgPublish &msgp, const char* data, const size_t size){
    if (!msgp.ParseFromArray(data, size)) return make_tuple(nullptr, 0);
    auto tpc = ptrT<char>(msgp.topic().length()+1);
    memcpy(tpc, msgp.topic().data(), msgp.topic().size());
    return make_tuple(tpc, msgp.topic().size());
}
static inline void json2str(yyjson_val* v, char** d, size_t* l){
    *l = yyjson_get_len(v);
@@ -183,7 +225,7 @@
    memcpy(*d, yyjson_get_str(v), *l);
}
static struct creg* jsonval2creg(yyjson_val* v){
static struct creg* json2creg(yyjson_val* v){
    auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
    auto rinfo = ptrT<struct creg>();
@@ -192,31 +234,47 @@
    json2str(obj(pval, "name"), &pinfo->name.str, &pinfo->name.size);
    json2str(obj(pval, "id"), &pinfo->id.str, &pinfo->id.size);
    json2str(obj(pval, "info"), &pinfo->info.str, &pinfo->info.size);
    rinfo->pinfo = pinfo;
    rinfo->proc = pinfo;
    auto assign_arr = [&obj](yyjson_val* v){
        const size_t count = yyjson_arr_size(v);
        struct cstrarr arr = cstr_arr_new(count);
        char** arr = ptrT<char*>(count);
        for(size_t i = 0; i < count; i++){
            auto sv = yyjson_arr_get(v, i);
            char* entry = NULL;
            size_t entry_size = 0;
            json2str(sv, &entry, &entry_size);
            arr.arr[i] = cstr_new(entry, entry_size);
            arr[i] = ptrT<char>(entry_size+1);
            memcpy(arr[i], entry, entry_size);
        }
        return arr;
        return make_tuple(arr, count);
    };
    rinfo->channel = assign_arr(obj(v, "channel"));
    rinfo->topic_pub = assign_arr(obj(v, "pubTopic"));
    rinfo->topic_sub = assign_arr(obj(v, "subTopic"));
    rinfo->topic_sub_net = assign_arr(obj(v, "subNetTopic"));
    tie(rinfo->channel, rinfo->channel_count) = assign_arr(obj(v, "channel"));
    tie(rinfo->pub, rinfo->pub_count) = assign_arr(obj(v, "pubTopic"));
    tie(rinfo->sub, rinfo->sub_count) = assign_arr(obj(v, "subTopic"));
    tie(rinfo->sub_net, rinfo->sub_net_count) = assign_arr(obj(v, "subNetTopic"));
    return rinfo;
}
static inline void free_cclients(struct cclient* cli, const size_t size){
    for(size_t i = 0; i < size; i++) if (cli[i].rinfo) free_creg(cli[i].rinfo);
static inline void free_cclients(struct cclient* cli, const size_t count){
    for(size_t i = 0; i < count; i++) {
        if (cli[i].rinfo) {
            auto tmp = cli[i].rinfo;
            free(tmp->proc);
            auto free_arr = [](char** arr, const size_t count){
                for(size_t i = 0; i < count; i++){
                    free(arr[i]);
                }
                free(arr);
            };
            free_arr(tmp->channel, tmp->channel_count);
            free_arr(tmp->pub, tmp->pub_count);
            free_arr(tmp->sub, tmp->sub_count);
            free_arr(tmp->sub_net, tmp->sub_net_count);
        };
    }
    free(cli);
}
@@ -235,7 +293,7 @@
        rc.hbcnt = yyjson_get_int(obj(one, "heartbeatCount"));
        rc.dcnt = yyjson_get_int(obj(one, "deadCount"));
        rc.status = yyjson_get_int(obj(one, "status"));
        rc.rinfo = jsonval2creg(obj(one, "info"));
        rc.rinfo = json2creg(obj(one, "info"));
        cli[i] = rc;
    }
@@ -320,8 +378,8 @@
        *d = ptrT<char>(s.size()+1);
        memcpy(*d, s.data(), *l);
    };
    assign("id", &db->id.str, &db->id.size);
    assign("info", &db->info.str, &db->info.size);
    assign("id", &db->id, &db->idl);
    assign("info", &db->info, &db->infol);
    // printf("table %d id %s act %d info %s\n", tbl, id.c_str(), act, info.c_str());
    // string json;
@@ -332,22 +390,31 @@
    return db;
}
static tuple<char*, size_t> copymemory(const char* src, const size_t size){
    char* dst = ptrT<char>(size + 1);
    memcpy(dst, src, size);
    return make_tuple(dst, size);
}
static tuple<char*, size_t> copymemory(const string& src){
    char* dst = ptrT<char>(src.size() + 1);
    memcpy(dst, src.data(), src.size());
    return make_tuple(dst, src.size());
}
struct csubmsg* to_submsg(const char* data, const size_t size){
    MsgPublish msg;
    auto tp = parseTopic(msg, data, size);
    if (!msg.ParseFromArray(data, size)) return NULL;
    auto smsg = ptrT<struct csubmsg>();
    tie(smsg->topic, smsg->topicl) = copymemory(msg.topic());
    tie(smsg->msg, smsg->msgl) = copymemory(msg.data());
    auto pmsg = ptrT<struct csubmsg>();
    pmsg->topic = cstr_new(get<0>(tp), get<1>(tp));
    pmsg->msg = cstr_new(msg.data().data(), msg.data().size());
    return pmsg;
    return smsg;
}
void free_submsg(struct csubmsg* msg){
    if (msg){
        cstr_free(msg->topic);
        cstr_free(msg->msg);
        free(msg->topic);
        free(msg->msg);
        free(msg);
    }
}
@@ -355,15 +422,15 @@
struct DbChangeMsg* get_submsg_db(struct csubmsg* msg){
    // string json = toJSON((const char*)msg->data, msg->size, "DbChangeMessage");
    // return json2DbChangeMsg(json);
    return pb2DbChangeMsg((const char*)msg->msg.str, msg->msg.size);
    return pb2DbChangeMsg((const char*)msg->msg, msg->msgl);
}
void free_submsg_db(struct DbChangeMsg* msg){
    if (msg) freeDbChangeMsg(msg);
}
struct cproclist* get_submsg_proclist(struct csubmsg* msg){
    const char* data = msg->msg.str;
    const size_t size = msg->msg.size;
    const char* data = msg->msg;
    const size_t size = msg->msgl;
    auto pl = ptrT<struct cproclist>();
    pl->cli = json2cclients(data, size, &pl->count);
@@ -371,30 +438,19 @@
}
void free_submsg_proclist(struct cproclist* pl){
    if (pl) if (pl->cli) free_cclients(pl->cli, pl->count);
    free(pl);
}
//////////////////////////////////////////////////
static tuple<struct cstr, struct cstr> json2reqmsg(const struct cstr msg){
    yyjson_doc* doc = yyjson_read(msg.str, msg.size, 0);
    yyjson_val* root = yyjson_doc_get_root(doc);
    auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
    auto jp = obj(root, "path");
    auto jb = obj(root, "body");
    auto tp = make_tuple(cstr_new(yyjson_get_str(jp), yyjson_get_len(jp)),
                            cstr_new(yyjson_get_raw(jb), yyjson_get_len(jb)));
    yyjson_doc_free(doc);
    return tp;
}
struct creqmsg* to_reqmsg(const char* pid, const size_t pids, const char* data, const size_t size)
{
    auto msg = ptrT<struct creqmsg>();
    msg->procid = cstr_new(pid, pids);
    tie(msg->procid, msg->procidl) = copymemory(pid, pids);
    MsgRequestTopic msgRT;
    if (!msgRT.ParseFromArray(data, size)) return NULL;
    msg->msg = cstr_new(msgRT.data().data(), msgRT.data().size());
    tie(msg->msg, msg->msgl) = copymemory(msgRT.data());
    return msg;
}
@@ -408,68 +464,77 @@
    msgRT.set_data(data, datal);
    auto pbstr = msgRT.SerializeAsString();
    msg->msg = cstr_new(pbstr.data(), pbstr.size());
    tie(msg->msg, msg->msgl) = copymemory(pbstr);
    return msg;
}
void free_reqmsg(struct creqmsg* msg){
    if (msg){
        cstr_free(msg->procid);
        cstr_free(msg->msg);
        free(msg->procid);
        free(msg->msg);
        free(msg);
    }
}
struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg){
    struct cstr path = null_cstr();
    struct cstr body = null_cstr();
    tie(path, body) = json2reqmsg(msg->msg);
    if (body.size == 0) return NULL;
static tuple<const char*, size_t> json2body(yyjson_doc* doc){
    yyjson_val* root = yyjson_doc_get_root(doc);
    auto doc = yyjson_read(body.str, body.size, 0);
    auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
    auto jp = obj(root, "path");
    if (!jp) return make_tuple((const char*)NULL, 0);
    auto jb = obj(root, "body");
    if (!jb) return make_tuple((const char*)NULL, 0);
    return make_tuple(yyjson_get_str(jb), yyjson_get_len(jp));
}
struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg){
    auto doc0 = yyjson_read(msg->msg, msg->msgl, 0);
    auto tpbody = json2body(doc0);
    if (!get<0>(tpbody)) return NULL;
    auto doc = yyjson_read(get<0>(tpbody), get<1>(tpbody), 0);
    auto root = yyjson_doc_get_root(doc);
    auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
    auto jsid = obj(root, "stackId");
    auto jfid = obj(root, "fileId");
    auto stkmsg = ptrT<struct cstackmsgerr>();
    stkmsg->stackid = cstr_new(yyjson_get_str(jsid), yyjson_get_len(jsid));
    stkmsg->fileid = cstr_new(yyjson_get_str(jfid), yyjson_get_len(jfid));
    auto smsg = ptrT<struct cstackmsgerr>();
    tie(smsg->stackid, smsg->stackidl) = copymemory(yyjson_get_str(jsid), yyjson_get_len(jsid));
    tie(smsg->fileid, smsg->fileidl) = copymemory(yyjson_get_str(jfid), yyjson_get_len(jfid));
    yyjson_doc_free(doc);
    cstr_free(path);
    cstr_free(body);
    yyjson_doc_free(doc0);
    return stkmsg;
    return smsg;
}
void free_reqmsg_stackerr(struct cstackmsgerr* msg){
    if (msg){
        cstr_free(msg->stackid);
        cstr_free(msg->fileid);
        free(msg->stackid);
        free(msg->fileid);
        free(msg);
    }
}
// decode success msg
struct cstackmsg* get_reqmsg_stack(struct creqmsg* msg){
    struct cstr path = null_cstr();
    struct cstr body = null_cstr();
    tie(path, body) = json2reqmsg(msg->msg);
    if (body.size == 0) return NULL;
    auto doc0 = yyjson_read(msg->msg, msg->msgl, 0);
    auto tpbody = json2body(doc0);
    if (!get<0>(tpbody)) return NULL;
    auto doc = yyjson_read(body.str, body.size, 0);
    auto doc = yyjson_read(get<0>(tpbody), get<1>(tpbody), 0);
    auto root = yyjson_doc_get_root(doc);
    auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
    auto assign = [&obj](yyjson_val* v, const char* name){
        auto tmp(obj(v, name));
        return cstr_new(yyjson_get_str(tmp), yyjson_get_len(tmp));
        return copymemory(yyjson_get_str(tmp), yyjson_get_len(tmp));
    };
    auto smsg = ptrT<struct cstackmsg>();
    smsg->procnum = yyjson_get_int(obj(root, "procNum"));
    smsg->stackid = assign(root, "stackId");
    smsg->stackname = assign(root, "stackName");
    tie(smsg->stackid, smsg->stackidl) = assign(root, "stackId");
    tie(smsg->stackname, smsg->stacknamel) = assign(root, "stackName");
    smsg->type = yyjson_get_int(obj(root, "type"));
    smsg->width = yyjson_get_int(obj(root, "resolution_width"));
    smsg->width = yyjson_get_int(obj(root, "resolution_height"));
@@ -482,25 +547,26 @@
        struct cstackfile file;
        memset(&file, 0, sizeof(file));
        yyjson_val* one = yyjson_arr_get(arr, i);
        file.id = assign(one, "id");
        file.name = assign(one, "name");
        file.path = assign(one, "path");
        tie(file.id, file.idl) = assign(one, "id");
        tie(file.name, file.namel) = assign(one, "name");
        tie(file.path, file.pathl) = assign(one, "path");
        file.type = yyjson_get_int(obj(one, "type"));
        smsg->files[i] = file;
    }
    yyjson_doc_free(doc);
    yyjson_doc_free(doc0);
    return smsg;
}
void free_reqmsg_stack(struct cstackmsg* msg){
    if (msg){
        cstr_free(msg->stackid);
        cstr_free(msg->stackname);
        free(msg->stackid);
        free(msg->stackname);
        for(size_t i = 0; i < msg->filescnt; i++){
            cstr_free(msg->files[i].id);
            cstr_free(msg->files[i].name);
            cstr_free(msg->files[i].path);
            free(msg->files[i].id);
            free(msg->files[i].name);
            free(msg->files[i].path);
        }
        free(msg);
    }
@@ -511,12 +577,20 @@
{
    auto msg = ptrT<struct crepmsg>();
    msg->errcode = errcode;
    msg->errmsg = cstr_new(errmsg, emsgl);
    msg->data = cstr_new(data, datal);
    tie(msg->errmsg, msg->errmsgl) = copymemory(errmsg, emsgl);
    tie(msg->data, msg->datal) = copymemory(data, datal);
    return msg;
}
struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl,
void free_reply_msg(struct crepmsg* msg){
    if (msg){
        free(msg->errmsg);
        free(msg->data);
        free(msg);
    }
}
static struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl,
                                const char* data, const size_t datal)
{
    auto doc = yyjson_mut_doc_new(NULL);
@@ -529,12 +603,4 @@
    char* json = yyjson_mut_val_write(root, 0, &jsonl);
    yyjson_mut_doc_free(doc);
    return cstr_ref(json, jsonl);
}
void free_reply_msg(struct crepmsg* msg){
    if (msg){
        cstr_free(msg->errmsg);
        cstr_free(msg->data);
        free(msg);
    }
}
message.h
@@ -3,42 +3,20 @@
#include <stddef.h>
struct cstr{
    char*  str;         // 字符串内容
    size_t size;        // 字符串长度
};
// 进程信息
struct cproc{
    struct cstr name;   // 进程名字
    struct cstr id;     // 进程id
    struct cstr info;   // 进程其他信息,目前没用
};
/*
    注册进程的proc信息,包括进程名[name], 进程[id],等
*/
typedef struct cproc cproc;
/*
    进程注册信息, 包括进程的proc信息 cproc
    待注册的本进程提供请求响应的主题 topics[s], char** rep, 主题数量 repcnt
    待注册的本进程发布的主题 topics[s], char** pub, 主题数量 pubcnt
    待注册的本进程订阅的主题 topics[s], char** sub, 主题数量 subcnt
    待注册的本进程订阅的[网络、远程主机]主题 topics[s], char** subnet, 主题数量 subnetcnt
*/
typedef struct creg creg;
struct cstrarr{
    struct cstr* arr;   // 字符串数组
    size_t count;       // 字符串数组长度
};
// 进程注册信息
struct creg{
    struct cproc*  pinfo;       // 需要注册的进程信息
    cstrarr channel;            // 进程提供的请求响应服务的主题
    cstrarr topic_pub;          // 进程提供的发布订阅的发布主题
    cstrarr topic_sub;          // 进程需要订阅的主题
    cstrarr topic_sub_net;      // 进程需要订阅的网络主题,目前没用
};
// 其他进程信息
struct cclient{
    struct creg*  rinfo;        // 代表其他进程的进程信息
    int replykey;               // 没用,上一个版本用共享内存,此为key
    int hbcnt;                  // 心跳次数?可能没用
    int dcnt;                   // deadcount,可能没用?
    int status;                 // 进程状态,可能没用?
};
//TableChanged enum
//TableChanged enum pollcontrol 接收到的订阅消息,数据库的变化
enum TableChanged {
    T_Camera = 0,       //摄像机变化
    T_CameraRule = 1,       //摄像机任务参数变化
@@ -58,6 +36,7 @@
    T_CameraPolygonRelation = 15,        //摄像机区域的关联关系
    T_Voice = 16,       //报警声音发生变化
};
// pollcontrol 接收到的订阅消息,数据库的操作
enum DbAction {
    Insert = 0,     //Insert db
    Update = 1,     //Update db
@@ -67,18 +46,32 @@
// 数据库变化信息
struct DbChangeMsg{
    TableChanged table;     //变化的表
    struct cstr   id;      //变化数据id
    char*           id;      //变化数据id
    size_t          idl;     // id len
    DbAction action;        //action/ DbAction[Insert/Update/Delete]
    struct cstr   info;    //变化内容
    char*           info;    //变化内容
    size_t          infol;
};
// 订阅消息
// 订阅消息, 包括数据库消息和其他进程的消息
struct csubmsg{
    struct cstr topic;      // 收到的订阅消息的主题,区分那种订阅消息
    char* topic;      // 收到的订阅消息的主题,区分那种订阅消息
    size_t topicl;
    // private
    // enum MsgT {NONE=0, DB, PROCLIST} type;
    struct cstr msg;        // 收到的订阅消息的body
    char* msg;        // 收到的订阅消息的body
    size_t msgl;
};
// 其他进程信息, pollcontrol可能依赖其他进程的服务,可能需要等其他进程启动再开始工作
struct cclient{
    creg*  rinfo;               // 代表其他进程的进程信息
    int replykey;               // 没用,上一个版本用共享内存,此为key
    int hbcnt;                  // 心跳次数?可能没用
    int dcnt;                   // deadcount,可能没用?
    int status;                 // 进程状态,可能没用?
};
// 其他注册进程列表,如数据库进程需要启动再运行pollcontrol逻辑
@@ -94,16 +87,20 @@
    接收request消息会带有发出request消息的进程的id,procid
*/
struct creqmsg{
    struct cstr procid;     // 发送request消息的进程id
    struct cstr msg;        // request消息体
    char* procid;     // 发送request消息的进程id
    size_t procidl;
    char* msg;        // request消息体
    size_t msgl;
};
// decode stack err msg
/*
    pollcontrol会接收数据栈解码发送的解码失败消息,用于置网页状态
*/
struct cstackmsgerr{
    struct cstr stackid;    // 解码失败的数据栈id
    struct cstr fileid;     // 解码失败的文件id
    char* stackid;    // 解码失败的数据栈id
    size_t stackidl;
    char* fileid;     // 解码失败的文件id
    size_t fileidl;
};
// stack file
/*
@@ -111,9 +108,13 @@
    看代码应该只需要解析出的几个
*/
struct cstackfile{
    struct cstr id;         // 文件id
    struct cstr name;       // 文件name
    struct cstr path;       // 文件路径
    char* id;         // 文件id
    size_t idl;
    char* name;       // 文件name
    size_t namel;
    char* path;       // 文件路径
    size_t pathl;
    int type;               // 文件类型 1:video,2:picture
    void* noused;           // 未使用
};
@@ -123,8 +124,10 @@
*/
struct cstackmsg{
    int procnum;                // decoder 启动的进程号,数据栈可能会有数个decoder同时运行
    struct cstr stackid;        // 数据栈 id
    struct cstr stackname;      // 数据栈 name
    char* stackid;        // 数据栈 id
    size_t stackidl;
    char* stackname;      // 数据栈 name
    size_t stacknamel;
    int type;                   // 数据栈类型 video picture
    int shmkey;                 // 数据栈使用的共享内存key
    int width;                  // 分辨率
@@ -138,45 +141,59 @@
// 对应 bhome_msg.MsgRequestTopicReply
struct crepmsg{
    int errcode;                // 相应request请求的消息,错误码
    struct cstr errmsg;         // 错误消息
    struct cstr data;           // 消息体
    char* errmsg;         // 错误消息
    size_t errmsgl;
    char* data;           // 消息体
    size_t datal;
};
#ifdef __cplusplus
extern "C"{
#endif
/*
    封装了C接口的string
    cstr_new 创建一个string,包括内存地址和长度,会拷贝参数
    必须使用cstr_free释放
*/
struct cstr cstr_new(const char* str, const size_t len);
void cstr_free(struct cstr str);
/*
    封装字符串数组,其中是一个struct cstr数组,包括指向数组的指针和count
    通过cstr_arr_add添加字符串,内部会拷贝字符串
    必须使用cstr_arr_free释放
*/
struct cstrarr cstr_arr_new(const size_t count);
void cstr_arr_add(struct cstrarr* arr, const char* data, const size_t len, const size_t idx);
void cstr_arr_free(struct cstrarr arr);
/*
    创建struct cproc 结构,对应procinfo,保存proc的name,id,info[当前没有使用]
    必须使用free_proc_info释放
    内部使用,深拷贝 cproc
*/
struct cproc* make_proc_info(const struct cstr name, const struct cstr id, const struct cstr info);
cproc* internal_clone_cproc(const cproc* proc);
void internal_cproc_free(cproc* proc);
/*
    从已存在的proc克隆,会拷贝,使用free_proc_info释放
    返回一个 cproc 结构,仅拷贝输入参数
    生命周期 cproc < name[id]
*/
struct cproc* clone_proc_info(const struct cproc* pi);
void free_proc_info(struct cproc* pi);
cproc* make_cproc(const char* name, const char* id);
/*
    从已存在的 cproc 中获取 name 和 id,仅仅是 cproc 内部的引用,无需释放
    生命周期 name[id] < cproc
*/
char* cproc_name(const cproc* proc);
char* cproc_id(const cproc* proc);
/*
    释放 cproc 指针
*/
void cproc_free(cproc* proc);
/*
    释放creg结构指针
    creg结构可以使用上述make_proc_info、cstr_arr_new、cstr_new函数创建
    返回一个 creg 结构,仅拷贝输入参数
    输入参数生命周期需覆盖返回值生命周期
*/
void free_creg(struct creg* reg);
creg* make_creg(const cproc* proc, const char** rep, const size_t repcnt,
                                    const char** pub, const size_t pubcnt,
                                    const char** sub, const size_t subcnt,
                                    const char** subnet, const size_t subnetcnt);
/*
    获取已存在的 creg 中的 cproc 或者注册的主题,无需释放
    生命周期 < creg
*/
const cproc* creg_proc(const creg* reg);
char** creg_rep_topic(const creg* reg, size_t* count);
char** creg_pub_topic(const creg* reg, size_t* count);
char** creg_sub_topic(const creg* reg, size_t* count);
char** creg_subnet_topic(const creg* reg, size_t* count);
/*
    释放 creg 指针
*/
void creg_free(creg* reg);
// 订阅消息相关,订阅数据库db消息和进程列表proclist消息
/*
    cbhomeclient.cpp中使用,将接收到的submsg解包成csubmsg
@@ -248,14 +265,8 @@
// reply msg
/*
    no use 将reply消息序列化为json,目前没有使用
    使用 cstr_free 释放
*/
struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl,
                                const char* data, const size_t datal);
/*
    创建 creqmsg 包括errcode、errmsg和消息体data
    使用 free_reply_msg 释放
    必须使用 free_reply_msg 释放
*/
struct crepmsg* make_reply_msg(const int errcode, const char* errmsg, const size_t emsgl,
                                const char* data, const size_t datal);