zhangmeng
2022-12-27 cf0a3209b51babf72469d962914db0dac2e5f52c
message.cpp
@@ -13,79 +13,172 @@
#include "google/protobuf/dynamic_message.h"
#include "google/protobuf/compiler/importer.h"
#include "3dparty/yyjson/yyjson.h"
#include "3rdparty/yyjson/yyjson.h"
#include "bhome_msg_api.pb.h"
using namespace bhome_msg;
using namespace std;
struct cstr{
    char*  str;         // 字符串内容
    size_t size;        // 字符串长度
};
// 进程信息
struct cproc{
    struct cstr name;   // 进程名字
    struct cstr id;     // 进程id
    struct cstr info;   // 进程其他信息,目前没用
};
struct creg{
    cproc*  proc;                    // 需要注册的进程信息
    char** channel;             // 进程提供的请求响应服务的主题
    size_t channel_count;
    char** pub;                 // 进程提供的发布订阅的发布主题
    size_t pub_count;
    char** sub;                 // 进程需要订阅的主题
    size_t sub_count;
    char** sub_net;             // 进程需要订阅的网络主题,目前没用
    size_t sub_net_count;
};
template <class T> T* ptrT(const size_t l=1){ return (T*)calloc(l, sizeof(T)); }
static struct cstr cstr_clone(const struct cstr old){
    return cstr_new(old.str, old.size);
}
template<class T> struct rmC : remove_const<T>{};
template<class T> struct rmC<const T*>{ using type = typename rmC<T>::type*; };
template<class T> struct rmC<const T**>{ using type = typename rmC<T>::type**; };
template <class T, typename enable_if<is_pointer<T>::value>::type* = nullptr>
typename rmC<T>::type rmConst(T t){ return const_cast<typename rmC<T>::type>(t); }
struct cstr cstr_new(const char* str, const size_t len){
static inline struct cstr cstr_ref(const char* str, const size_t len){
    struct cstr cs;
    memset(&cs, 0, sizeof(cs));
    cs.size = len;
    cs.str = ptrT<char>(len);
    memcpy(cs.str, str, len);
    cs.str = rmConst(str);
    return cs;
}
void cstr_free(struct cstr str){
    if (str.str && str.size) free(str.str);
}
struct cstrarr cstr_arr_new(const size_t len){
    struct cstrarr arr;
    memset(&arr, 0, sizeof(arr));
    arr.arr = (struct cstr*)calloc(len, sizeof(struct cstr));
    arr.size = len;
    return arr;
}
void cstr_arr_add(struct cstrarr* arr, const char* data, const size_t len, const size_t idx){
    if (arr->arr && arr->size && idx < arr->size){
        arr->arr[idx] = cstr_new(data, len);
    }
}
void cstr_arr_free(struct cstrarr arr){
    for(size_t i = 0; i < arr.size; i++){
        auto & str = arr.arr[i];
        cstr_free(str);
    }
    free(arr.arr);
static inline struct cstr cstr_clone(const struct cstr* str){
    struct cstr cs;
    cs.size = str->size;
    cs.str = ptrT<char>(cs.size);
    memcpy(cs.str, str, cs.size);
    return cs;
}
void free_creg(struct creg* reg){
    if (reg){
        if (reg->pinfo) free_proc_info(reg->pinfo);
        cstr_arr_free(reg->channel);
        cstr_arr_free(reg->topic_pub);
        cstr_arr_free(reg->topic_sub);
        cstr_arr_free(reg->topic_sub_net);
        free(reg);
cproc* internal_clone_cproc(const cproc* proc){
    auto nproc = ptrT<struct cproc>();
    nproc->name = cstr_clone(&proc->name);
    nproc->id = cstr_clone(&proc->id);
    return nproc;
}
void internal_cproc_free(cproc* proc){
    if (proc){
        free(proc->name.str);
        free(proc->id.str);
        free(proc->info.str);
        free(proc);
    }
}
struct cproc* clone_proc_info(struct cproc* pi){
    if (!pi) return NULL;
    auto newpi = ptrT<struct cproc>();
    *newpi = *pi;
    newpi->name = cstr_clone(newpi->name);
    newpi->id = cstr_clone(newpi->id);
    // todo: ignore info
    return newpi;
cproc* make_cproc(const char* name, const char* id){
    auto proc = ptrT<struct cproc>();
    proc->name = cstr_ref(name, strlen(name));
    proc->id = cstr_ref(id, strlen(id));
    return proc;
}
void free_proc_info(struct cproc* pi){
    if (pi){
        cstr_free(pi->name);
        cstr_free(pi->id);
        cstr_free(pi->info);
        free(pi);
    }
char* cproc_name(const cproc* proc){
    if (!proc) return NULL;
    return proc->name.str;
}
char* cproc_id(const cproc* proc){
    if (!proc) return NULL;
    return proc->id.str;
}
void cproc_free(cproc* proc){
    free(proc);
}
creg* make_creg(const cproc* proc, const char** rep, const size_t repcnt,
                                    const char** pub, const size_t pubcnt,
                                    const char** sub, const size_t subcnt,
                                    const char** subnet, const size_t subnetcnt)
{
    auto reg = ptrT<struct creg>();
    reg->proc = rmConst(proc);
    reg->channel = rmConst(rep);
    reg->channel_count = repcnt;
    reg->pub = rmConst(pub);
    reg->pub_count = pubcnt;
    reg->sub = rmConst(sub);
    reg->sub_count = subcnt;
    reg->sub_net = rmConst(subnet);
    reg->sub_net_count = subnetcnt;
    return reg;
}
creg* make_creg_from_cproc(const cproc* proc){
    auto reg = ptrT<struct creg>();
    reg->proc = rmConst(proc);
    return reg;
}
static inline void creg_add_topic(char*** dst, size_t* dstC, const char** src, const size_t srcC){
    *dst = rmConst(src);
    *dstC = srcC;
}
void creg_add_topic_reply(creg* reg, const char** topic, const size_t count){
    creg_add_topic(&reg->channel, &reg->channel_count, topic, count);
}
void creg_add_topic_pub(creg* reg, const char** topic, const size_t count){
    creg_add_topic(&reg->pub, &reg->pub_count, topic, count);
}
void creg_add_topic_sub(creg* reg, const char** topic, const size_t count){
    creg_add_topic(&reg->sub, &reg->sub_count, topic, count);
}
void creg_add_topic_subnet(creg* reg, const char** topic, const size_t count){
    creg_add_topic(&reg->sub_net, &reg->sub_net_count, topic, count);
}
const cproc* creg_proc(const creg* reg){
    if (!reg) return NULL;
    return reg->proc;
}
char** creg_reply_topic(const creg* reg, size_t* count){
    if (!reg) return NULL;
    *count = reg->channel_count;
    return reg->channel;
}
char** creg_pub_topic(const creg* reg, size_t* count){
    if (!reg) return NULL;
    *count = reg->pub_count;
    return reg->pub;
}
char** creg_sub_topic(const creg* reg, size_t* count){
    if (!reg) return NULL;
    *count = reg->sub_count;
    return reg->sub;
}
char** creg_subnet_topic(const creg* reg, size_t* count){
    if (!reg) return NULL;
    *count = reg->sub_net_count;
    return reg->sub_net;
}
void creg_free(creg* reg){
    free(reg);
}
/////////////////////////////////////////////////////////////////////
@@ -135,28 +228,21 @@
    };
    msg->table = (TableChanged)yyjson_get_int(obj("table"));
    msg->action = (DbAction)yyjson_get_int(obj("action"));
    assign("id", &msg->id.str, &msg->id.size);
    assign("info", &msg->info.str, &msg->info.size);
    assign("id", &msg->id, &msg->idl);
    assign("info", &msg->info, &msg->infol);
    yyjson_doc_free(doc);
    return msg;
}
static inline void freeDbChangeMsg(struct DbChangeMsg* msg){
    if (msg){
        cstr_free(msg->id);
        cstr_free(msg->info);
        free(msg->id);
        free(msg->info);
        free(msg);
    }
}
///////////////////////////////////////////////////////////////
static inline tuple<char*, size_t> parseTopic(MsgPublish &msgp, const char* data, const size_t size){
    if (!msgp.ParseFromArray(data, size)) return make_tuple(nullptr, 0);
    auto tpc = ptrT<char>(msgp.topic().length()+1);
    memcpy(tpc, msgp.topic().data(), msgp.topic().size());
    return make_tuple(tpc, msgp.topic().size());
}
static inline void json2str(yyjson_val* v, char** d, size_t* l){
    *l = yyjson_get_len(v);
@@ -164,7 +250,7 @@
    memcpy(*d, yyjson_get_str(v), *l);
}
static struct creg* jsonval2creg(yyjson_val* v){
static struct creg* json2creg(yyjson_val* v){
    auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
    auto rinfo = ptrT<struct creg>();
@@ -173,33 +259,46 @@
    json2str(obj(pval, "name"), &pinfo->name.str, &pinfo->name.size);
    json2str(obj(pval, "id"), &pinfo->id.str, &pinfo->id.size);
    json2str(obj(pval, "info"), &pinfo->info.str, &pinfo->info.size);
    rinfo->pinfo = pinfo;
    rinfo->proc = pinfo;
    auto assign_arr = [&obj](yyjson_val* v){
        const size_t size = yyjson_arr_size(v);
        struct cstrarr arr;
        memset(&arr, 0, sizeof(arr));
        arr.size = size;
        for(size_t i = 0; i < size; i++){
        const size_t count = yyjson_arr_size(v);
        char** arr = ptrT<char*>(count);
        for(size_t i = 0; i < count; i++){
            auto sv = yyjson_arr_get(v, i);
            char* entry = NULL;
            size_t entry_size = 0;
            json2str(sv, &entry, &entry_size);
            arr.arr[i] = cstr_new(entry, entry_size);
            arr[i] = entry;
        }
        return arr;
        return make_tuple(arr, count);
    };
    rinfo->channel = assign_arr(obj(v, "channel"));
    rinfo->topic_pub = assign_arr(obj(v, "pubTopic"));
    rinfo->topic_sub = assign_arr(obj(v, "subTopic"));
    rinfo->topic_sub_net = assign_arr(obj(v, "subNetTopic"));
    tie(rinfo->channel, rinfo->channel_count) = assign_arr(obj(v, "channel"));
    tie(rinfo->pub, rinfo->pub_count) = assign_arr(obj(v, "pubTopic"));
    tie(rinfo->sub, rinfo->sub_count) = assign_arr(obj(v, "subTopic"));
    tie(rinfo->sub_net, rinfo->sub_net_count) = assign_arr(obj(v, "subNetTopic"));
    return rinfo;
}
static inline void free_cclients(struct cclient* cli, const size_t size){
    for(size_t i = 0; i < size; i++) if (cli[i].rinfo) free_creg(cli[i].rinfo);
static inline void free_cclients(struct cclient* cli, const size_t count){
    for(size_t i = 0; i < count; i++) {
        if (cli[i].rinfo) {
            auto tmp = cli[i].rinfo;
            free(tmp->proc);
            auto free_arr = [](char** arr, const size_t count){
                for(size_t i = 0; i < count; i++){
                    free(arr[i]);
                }
                free(arr);
            };
            free_arr(tmp->channel, tmp->channel_count);
            free_arr(tmp->pub, tmp->pub_count);
            free_arr(tmp->sub, tmp->sub_count);
            free_arr(tmp->sub_net, tmp->sub_net_count);
        };
    }
    free(cli);
}
@@ -218,7 +317,7 @@
        rc.hbcnt = yyjson_get_int(obj(one, "heartbeatCount"));
        rc.dcnt = yyjson_get_int(obj(one, "deadCount"));
        rc.status = yyjson_get_int(obj(one, "status"));
        rc.rinfo = jsonval2creg(obj(one, "info"));
        rc.rinfo = json2creg(obj(one, "info"));
        cli[i] = rc;
    }
@@ -303,8 +402,8 @@
        *d = ptrT<char>(s.size()+1);
        memcpy(*d, s.data(), *l);
    };
    assign("id", &db->id.str, &db->id.size);
    assign("info", &db->info.str, &db->info.size);
    assign("id", &db->id, &db->idl);
    assign("info", &db->info, &db->infol);
    // printf("table %d id %s act %d info %s\n", tbl, id.c_str(), act, info.c_str());
    // string json;
@@ -315,22 +414,31 @@
    return db;
}
static tuple<char*, size_t> copymemory(const char* src, const size_t size){
    char* dst = ptrT<char>(size + 1);
    memcpy(dst, src, size);
    return make_tuple(dst, size);
}
static tuple<char*, size_t> copymemory(const string& src){
    char* dst = ptrT<char>(src.size() + 1);
    memcpy(dst, src.data(), src.size());
    return make_tuple(dst, src.size());
}
struct csubmsg* to_submsg(const char* data, const size_t size){
    MsgPublish msg;
    auto tp = parseTopic(msg, data, size);
    if (!msg.ParseFromArray(data, size)) return NULL;
    auto smsg = ptrT<struct csubmsg>();
    tie(smsg->topic, smsg->topicl) = copymemory(msg.topic());
    tie(smsg->msg, smsg->msgl) = copymemory(msg.data());
    auto pmsg = ptrT<struct csubmsg>();
    pmsg->topic = cstr_new(get<0>(tp), get<1>(tp));
    pmsg->msg = cstr_new(msg.data().data(), msg.data().size());
    return pmsg;
    return smsg;
}
void free_submsg(struct csubmsg* msg){
    if (msg){
        cstr_free(msg->topic);
        cstr_free(msg->msg);
        free(msg->topic);
        free(msg->msg);
        free(msg);
    }
}
@@ -338,103 +446,123 @@
struct DbChangeMsg* get_submsg_db(struct csubmsg* msg){
    // string json = toJSON((const char*)msg->data, msg->size, "DbChangeMessage");
    // return json2DbChangeMsg(json);
    return pb2DbChangeMsg((const char*)msg->msg.str, msg->msg.size);
    return pb2DbChangeMsg((const char*)msg->msg, msg->msgl);
}
void free_submsg_db(struct DbChangeMsg* msg){
    if (msg) freeDbChangeMsg(msg);
}
struct cproclist* get_submsg_proclist(struct csubmsg* msg){
    const char* data = msg->msg.str;
    const size_t size = msg->msg.size;
    const char* data = msg->msg;
    const size_t size = msg->msgl;
    auto pl = ptrT<struct cproclist>();
    pl->cli = json2cclients(data, size, &pl->clientsize);
    pl->cli = json2cclients(data, size, &pl->count);
    return pl;
}
void free_submsg_proclist(struct cproclist* pl){
    if (pl) if (pl->cli) free_cclients(pl->cli, pl->clientsize);
    if (pl) if (pl->cli) free_cclients(pl->cli, pl->count);
    free(pl);
}
//////////////////////////////////////////////////
struct creq json2creq(const char* data, const size_t size){
    yyjson_doc* doc = yyjson_read(data, size, 0);
    yyjson_val* root = yyjson_doc_get_root(doc);
    auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
    struct creq req;
    memset(&req, 0, sizeof(req));
    auto jp = obj(root, "path");
    auto jb = obj(root, "body");
    req.path = cstr_new(yyjson_get_str(jp), yyjson_get_len(jp));
    req.body = cstr_new(yyjson_get_raw(jb), yyjson_get_len(jb));
    yyjson_doc_free(doc);
    return req;
}
struct creqmsg* to_reqmsg(const char* pid, const size_t pids,
                            const char* data, const size_t size, void* src)
struct creqmsg* to_reqmsg(const char* pid, const size_t pids, const char* data, const size_t size)
{
    auto msg = ptrT<struct creqmsg>();
    msg->procid = cstr_new(pid, pids);
    MsgRequestTopic msgRT;
    if (!msgRT.ParseFromArray(data, size)) return NULL;
    tie(msg->procid, msg->procidl) = copymemory(pid, pids);
    // creg = msgRT.data();
    msg->msg = json2creq(data, size);
    msg->src = src;
    MsgRequestTopic msgRT;
    if (!msgRT.ParseFromArray(data, size)) {
        free(msg->procid);
        free(msg);
        return NULL;
    }
    tie(msg->msg, msg->msgl) = copymemory(msgRT.data());
    return msg;
}
struct creqmsg* make_req_msg(const char* topic, const size_t topicl,
                                const char* data, const size_t datal)
{
    auto msg = ptrT<struct creqmsg>();
    MsgRequestTopic msgRT;
    msgRT.set_topic(topic, topicl);
    msgRT.set_data(data, datal);
    const auto& pbstr = msgRT.SerializeAsString();
    tie(msg->msg, msg->msgl) = copymemory(pbstr);
    return msg;
}
void free_reqmsg(struct creqmsg* msg){
    if (msg){
        cstr_free(msg->procid);
        cstr_free(msg->msg.path);
        cstr_free(msg->msg.body);
        free(msg->procid);
        free(msg->msg);
        free(msg);
    }
}
static tuple<const char*, size_t> json2body(yyjson_doc* doc){
    yyjson_val* root = yyjson_doc_get_root(doc);
    auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
    auto jp = obj(root, "path");
    if (!jp) return make_tuple((const char*)NULL, 0);
    auto jb = obj(root, "body");
    if (!jb) return make_tuple((const char*)NULL, 0);
    return make_tuple(yyjson_get_str(jb), yyjson_get_len(jp));
}
struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg){
    auto &body = msg->msg.body;
    auto doc = yyjson_read(body.str, body.size, 0);
    auto doc0 = yyjson_read(msg->msg, msg->msgl, 0);
    auto tpbody = json2body(doc0);
    if (!get<0>(tpbody)) return NULL;
    auto doc = yyjson_read(get<0>(tpbody), get<1>(tpbody), 0);
    auto root = yyjson_doc_get_root(doc);
    auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
    auto jsid = obj(root, "stackId");
    auto jfid = obj(root, "fileId");
    auto stkmsg = ptrT<struct cstackmsgerr>();
    stkmsg->stackid = cstr_new(yyjson_get_str(jsid), yyjson_get_len(jsid));
    stkmsg->fileid = cstr_new(yyjson_get_str(jfid), yyjson_get_len(jfid));
    auto smsg = ptrT<struct cstackmsgerr>();
    tie(smsg->stackid, smsg->stackidl) = copymemory(yyjson_get_str(jsid), yyjson_get_len(jsid));
    tie(smsg->fileid, smsg->fileidl) = copymemory(yyjson_get_str(jfid), yyjson_get_len(jfid));
    yyjson_doc_free(doc);
    return stkmsg;
    yyjson_doc_free(doc0);
    return smsg;
}
void free_reqmsg_stackerr(struct cstackmsgerr* msg){
    if (msg){
        cstr_free(msg->stackid);
        cstr_free(msg->fileid);
        free(msg->stackid);
        free(msg->fileid);
        free(msg);
    }
}
// decode success msg
struct cstackmsg* get_reqmsg_stack(struct creqmsg* msg){
    auto &body = msg->msg.body;
    auto doc = yyjson_read(body.str, body.size, 0);
    auto doc0 = yyjson_read(msg->msg, msg->msgl, 0);
    auto tpbody = json2body(doc0);
    if (!get<0>(tpbody)) return NULL;
    auto doc = yyjson_read(get<0>(tpbody), get<1>(tpbody), 0);
    auto root = yyjson_doc_get_root(doc);
    auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
    auto assign = [&obj](yyjson_val* v, const char* name){
        auto tmp(obj(v, name));
        return cstr_new(yyjson_get_str(tmp), yyjson_get_len(tmp));
        return copymemory(yyjson_get_str(tmp), yyjson_get_len(tmp));
    };
    auto smsg = ptrT<struct cstackmsg>();
    smsg->procnum = yyjson_get_int(obj(root, "procNum"));
    smsg->stackid = assign(root, "stackId");
    smsg->stackname = assign(root, "stackName");
    tie(smsg->stackid, smsg->stackidl) = assign(root, "stackId");
    tie(smsg->stackname, smsg->stacknamel) = assign(root, "stackName");
    smsg->type = yyjson_get_int(obj(root, "type"));
    smsg->width = yyjson_get_int(obj(root, "resolution_width"));
    smsg->width = yyjson_get_int(obj(root, "resolution_height"));
@@ -447,47 +575,70 @@
        struct cstackfile file;
        memset(&file, 0, sizeof(file));
        yyjson_val* one = yyjson_arr_get(arr, i);
        file.id = assign(one, "id");
        file.name = assign(one, "name");
        file.path = assign(one, "path");
        tie(file.id, file.idl) = assign(one, "id");
        tie(file.name, file.namel) = assign(one, "name");
        tie(file.path, file.pathl) = assign(one, "path");
        file.type = yyjson_get_int(obj(one, "type"));
        smsg->files[i] = file;
    }
    yyjson_doc_free(doc);
    yyjson_doc_free(doc0);
    return smsg;
}
void free_reqmsg_stack(struct cstackmsg* msg){
    if (msg){
        cstr_free(msg->stackid);
        cstr_free(msg->stackname);
        free(msg->stackid);
        free(msg->stackname);
        for(size_t i = 0; i < msg->filescnt; i++){
            cstr_free(msg->files[i].id);
            cstr_free(msg->files[i].name);
            cstr_free(msg->files[i].path);
            free(msg->files[i].id);
            free(msg->files[i].name);
            free(msg->files[i].path);
        }
        free(msg);
    }
}
struct crepmsg* make_reply_msg(const int success, const char* msg, const size_t msgl,
struct crepmsg* make_reply_msg(const int errcode, const char* errmsg, const size_t emsgl,
                                const char* data, const size_t datal)
{
    auto rmsg = ptrT<struct crepmsg>();
    // rmsg->success = success;
    // rmsg->msg = cstr_new(msg, msgl);
    // rmsg->data = cstr_new(msg, msgl);
    auto doc = yyjson_mut_doc_new(NULL);
    return rmsg;
    auto msg = ptrT<struct crepmsg>();
    msg->errcode = errcode;
    tie(msg->errmsg, msg->errmsgl) = copymemory(errmsg, emsgl);
    tie(msg->data, msg->datal) = copymemory(data, datal);
    return msg;
}
void free_reply_msg(struct crepmsg* msg){
    if (msg){
        cstr_free(msg->msg);
        cstr_free(msg->data);
        free(msg->errmsg);
        free(msg->data);
        free(msg);
    }
}
template <class F> void ignoref(F&& f){}
static struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl,
                                const char* data, const size_t datal)
{
    ignoref(make_reply_msg_json);
    auto doc = yyjson_mut_doc_new(NULL);
    auto root = yyjson_mut_obj(doc);
    yyjson_mut_obj_add_bool(doc, root, "success", !!success);
    yyjson_mut_obj_add_strn(doc, root, "msg", msg, msgl);
    yyjson_mut_obj_add_strn(doc, root, "data", data, datal);
    size_t jsonl = 0;
    char* json = yyjson_mut_val_write(root, 0, &jsonl);
    yyjson_mut_doc_free(doc);
    return cstr_ref(json, jsonl);
}
void free_query_procs(struct cqueryprocs* procs, const size_t count){
    for(size_t i = 0; i < count; i++){
        free(procs[i].id);
    }
    free(procs);
}