zhangmeng
2022-12-13 ba9e213d3f01555d823aaf453798a148dade45a4
添加注释
7个文件已修改
365 ■■■■ 已修改文件
CMakeLists.txt 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
cbhomeclient.cpp 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cbhomeclient.h 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
exported_symbols 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.cpp 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
message.cpp 73 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
message.h 192 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
CMakeLists.txt
@@ -37,7 +37,7 @@
    )
add_library(objs OBJECT ${src})
# set(CMAKE_SHARED_LINKER_FLAGS  ${CMAKE_SHARED_LINKER_FLAGS} "-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/exported_symbols")
set(CMAKE_SHARED_LINKER_FLAGS  ${CMAKE_SHARED_LINKER_FLAGS} "-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/exported_symbols")
add_library(${Target} SHARED $<TARGET_OBJECTS:objs>)
target_link_libraries(${Target} PRIVATE bhome_msg)
cbhomeclient.cpp
@@ -324,12 +324,17 @@
    return bus_send_reply(cli->bus, src, pbstr.data(), pbstr.size());
}
////////////////////////////////////////////////////
int bus_client_publish(void* handle, const char* topic, const size_t topicl, const char* data, const size_t size){
    MsgPublish pbmsg;
    pbmsg.set_topic(topic, topicl);
    pbmsg.set_data(data, size);
    auto pbstr = pbmsg.SerializeAsString();
    return bus_client_pubmsg(handle, pbstr.data(), pbstr.size());
}
// test
int bus_client_pubmsg(void* handle, void* data, const size_t size){
int bus_client_pubmsg(void* handle, const char* data, const size_t size){
    client* cli = ptr(handle);
    bus_publish(cli->bus, data, size, 100);
    return 0;
    return bus_publish(cli->bus, data, size, 100);
}
cbhomeclient.h
@@ -10,17 +10,52 @@
extern "C"{
#endif
/*
    配合 message.h[cpp] 使用
*/
/*
    初始化 bus client,srvid 表明当前机器的 srvid 用于远程通信,目前没用
    rinfo 是注册进程的消息,单进程多次调用,需要使用不同的rinfo
*/
void* bus_client_init(const char* srvid, const size_t sidsize, const creg* rinfo);
/*
    释放 bus client
*/
void bus_client_free(void* handle);
/*
    获取订阅的消息,订阅消息通过线程不停读取,此处从缓存中读取
    可通过 message.h 对应的 get_submsg_db get_submsg_proclist 获取对应的消息
*/
struct csubmsg* bus_client_get_submsg(void* handle);
/*
    发布消息,data 是 MsgPublish protobuffer序列化后的数据
*/
int bus_client_pubmsg(void* handle, const char* data, const size_t size);
/*
    发布消息,topic 是发布主题, data 是发布的消息体
*/
int bus_client_publish(void* handle, const char* topic, const size_t topicl, const char* data, const size_t size);
/*
    获取 request 消息,通过线程读取,此处从缓存中读取
    可通过 message.h 的 get_reqmsg_stackerr get_reqmsg_stack 获取对应的消息
    src 是哪一个进程请求的标识符
    可以响应多个request发送的消息,同时需要满足异步响应,使用 src 进行区分
    类似与tcp/ip的socket,标识一个连接
*/
struct creqmsg* bus_client_get_reqmsg(void* handle, void** src);
int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg);
/*
    响应消息回复,src是连接标识符,msg是需要回复的消息
    通过 message.h 的 make_reply_msg 创建
*/
int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg);
// test
int bus_client_pubmsg(void* handle, void* data, const size_t size);
/*
    同步的request请求,发送 creqmsg 获取 crepmsg 回复
    通过 message.h 的 make_req_msg 创建 request 消息
    获取的 crepmsg 回复消息,需要使用 free_reply_msg 释放
*/
int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg);
#ifdef __cplusplus
}
exported_symbols
@@ -3,11 +3,36 @@
    bus_client_init;
    bus_client_free;
    bus_client_get_submsg;
    bus_client_pubmsg;
    bus_client_publish;
    bus_client_get_reqmsg;
    bus_client_reply_msg;
    bus_client_request;
    cstr_new;
    cstr_free;
    cstr_arr_new;
    cstr_arr_add;
    cstr_arr_free;
    make_proc_info;
    clone_proc_info;
    free_proc_info;
    free_creg;
    to_submsg;
    free_submsg;
    get_submsg_db;
    free_submsg_db;
    get_submsg_proclist;
    free_submsg_proclist;
    to_reqmsg;
    make_req_msg;
    free_reqmsg;
    get_reqmsg_stackerr;
    free_reqmsg_stackerr;
    get_reqmsg_stack;
    free_reqmsg_stack;
    make_reply_msg_json;
    make_reply_msg;
    free_reply_msg;
local:
    *;
main.cpp
@@ -46,13 +46,14 @@
    while (true) {
        for(auto && i : topics){
            auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
            MsgPublish pbmsg;
            pbmsg.set_topic(i);
            pbmsg.set_data(msg);
            auto data = pbmsg.SerializeAsString();
            // TestPub(i.c_str(), i.length(), data.c_str(), data.length());
            int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size());
            printf("======>> bus_client_pubmsg [%s]\n", msg.c_str());
            // MsgPublish pbmsg;
            // pbmsg.set_topic(i);
            // pbmsg.set_data(msg);
            // auto data = pbmsg.SerializeAsString();
            // int ret = bus_client_pubmsg(handle, data.data(), data.size());
            int ret = bus_client_publish(handle, i.data(), i.size(), msg.data(), msg.size());
            printf("======>> bus_client_pubmsg [%s] ret %d\n", msg.c_str(), ret);
            this_thread::sleep_for(chrono::seconds{2});
        }
    }
message.cpp
@@ -22,19 +22,22 @@
template <class T> T* ptrT(const size_t l=1){ return (T*)calloc(l, sizeof(T)); }
static struct cstr cstr_clone(const struct cstr old){
    return cstr_new(old.str, old.size);
}
static struct cstr cstr_ref(char* str, const size_t len){
static inline struct cstr null_cstr(){
    struct cstr cs;
    memset(&cs, 0, sizeof(cs));
    return cs;
}
static inline struct cstr cstr_ref(char* str, const size_t len){
    struct cstr cs = null_cstr();
    cs.size = len;
    cs.str = str;
    return cs;
}
static inline struct cstr cstr_clone(const struct cstr old){
    return cstr_new(old.str, old.size);
}
struct cstr cstr_new(const char* str, const size_t len){
    struct cstr cs;
    memset(&cs, 0, sizeof(cs));
    struct cstr cs = null_cstr();
    cs.size = len;
    cs.str = ptrT<char>(len);
    memcpy(cs.str, str, len);
@@ -43,9 +46,13 @@
void cstr_free(struct cstr str){
    if (str.str && str.size) free(str.str);
}
struct cstrarr cstr_arr_new(const size_t count){
static inline struct cstrarr null_cstr_arr(){
    struct cstrarr arr;
    memset(&arr, 0, sizeof(arr));
    return arr;
}
struct cstrarr cstr_arr_new(const size_t count){
    struct cstrarr arr = null_cstr_arr();
    arr.arr = (struct cstr*)calloc(count, sizeof(struct cstr));
    arr.count = count;
    return arr;
@@ -63,21 +70,16 @@
    free(arr.arr);
}
void free_creg(struct creg* reg){
    if (reg){
        if (reg->pinfo) free_proc_info(reg->pinfo);
        cstr_arr_free(reg->channel);
        cstr_arr_free(reg->topic_pub);
        cstr_arr_free(reg->topic_sub);
        cstr_arr_free(reg->topic_sub_net);
        free(reg);
    }
struct cproc* make_proc_info(const struct cstr name, const struct cstr id, const struct cstr info){
    auto proc = ptrT<struct cproc>();
    proc->name = cstr_clone(name);
    proc->id = cstr_clone(id);
    // proc->info = cstr_clone(info);
    return proc;
}
struct cproc* clone_proc_info(struct cproc* pi){
struct cproc* clone_proc_info(const struct cproc* pi){
    if (!pi) return NULL;
    auto newpi = ptrT<struct cproc>();
    *newpi = *pi;
    newpi->name = cstr_clone(newpi->name);
    newpi->id = cstr_clone(newpi->id);
    // todo: ignore info
@@ -91,6 +93,17 @@
        cstr_free(pi->id);
        cstr_free(pi->info);
        free(pi);
    }
}
void free_creg(struct creg* reg){
    if (reg){
        if (reg->pinfo) free_proc_info(reg->pinfo);
        cstr_arr_free(reg->channel);
        cstr_arr_free(reg->topic_pub);
        cstr_arr_free(reg->topic_sub);
        cstr_arr_free(reg->topic_sub_net);
        free(reg);
    }
}
@@ -183,9 +196,7 @@
    auto assign_arr = [&obj](yyjson_val* v){
        const size_t count = yyjson_arr_size(v);
        struct cstrarr arr;
        memset(&arr, 0, sizeof(arr));
        arr.count = count;
        struct cstrarr arr = cstr_arr_new(count);
        for(size_t i = 0; i < count; i++){
            auto sv = yyjson_arr_get(v, i);
            char* entry = NULL;
@@ -355,11 +366,11 @@
    const size_t size = msg->msg.size;
    auto pl = ptrT<struct cproclist>();
    pl->cli = json2cclients(data, size, &pl->clientsize);
    pl->cli = json2cclients(data, size, &pl->count);
    return pl;
}
void free_submsg_proclist(struct cproclist* pl){
    if (pl) if (pl->cli) free_cclients(pl->cli, pl->clientsize);
    if (pl) if (pl->cli) free_cclients(pl->cli, pl->count);
}
//////////////////////////////////////////////////
@@ -388,12 +399,12 @@
    return msg;
}
struct creqmsg* make_req_msg(const char* topic, const size_t topics,
struct creqmsg* make_req_msg(const char* topic, const size_t topicl,
                                const char* data, const size_t datal)
{
    auto msg = ptrT<struct creqmsg>();
    MsgRequestTopic msgRT;
    msgRT.set_topic(topic, topics);
    msgRT.set_topic(topic, topicl);
    msgRT.set_data(data, datal);
    auto pbstr = msgRT.SerializeAsString();
@@ -411,9 +422,8 @@
}
struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg){
    struct cstr path, body;
    memset(&path, 0, sizeof(path));
    memset(&body, 0, sizeof(body));
    struct cstr path = null_cstr();
    struct cstr body = null_cstr();
    tie(path, body) = json2reqmsg(msg->msg);
    if (body.size == 0) return NULL;
@@ -443,9 +453,8 @@
}
// decode success msg
struct cstackmsg* get_reqmsg_stack(struct creqmsg* msg){
    struct cstr path, body;
    memset(&path, 0, sizeof(path));
    memset(&body, 0, sizeof(body));
    struct cstr path = null_cstr();
    struct cstr body = null_cstr();
    tie(path, body) = json2reqmsg(msg->msg);
    if (body.size == 0) return NULL;
message.h
@@ -4,38 +4,38 @@
#include <stddef.h>
struct cstr{
    char*  str;
    size_t size;
    char*  str;         // 字符串内容
    size_t size;        // 字符串长度
};
// 进程信息
struct cproc{
    struct cstr name;
    struct cstr id;
    struct cstr info;
    struct cstr name;   // 进程名字
    struct cstr id;     // 进程id
    struct cstr info;   // 进程其他信息,目前没用
};
struct cstrarr{
    struct cstr* arr;
    size_t count;
    struct cstr* arr;   // 字符串数组
    size_t count;       // 字符串数组长度
};
// 进程注册信息
struct creg{
    struct cproc*  pinfo;
    struct cproc*  pinfo;       // 需要注册的进程信息
    cstrarr channel;
    cstrarr topic_pub;
    cstrarr topic_sub;
    cstrarr topic_sub_net;
    cstrarr channel;            // 进程提供的请求响应服务的主题
    cstrarr topic_pub;          // 进程提供的发布订阅的发布主题
    cstrarr topic_sub;          // 进程需要订阅的主题
    cstrarr topic_sub_net;      // 进程需要订阅的网络主题,目前没用
};
// 其他进程信息
struct cclient{
    struct creg*  rinfo;
    struct creg*  rinfo;        // 代表其他进程的进程信息
    int replykey;
    int hbcnt;
    int dcnt;
    int status;
    int replykey;               // 没用,上一个版本用共享内存,此为key
    int hbcnt;                  // 心跳次数?可能没用
    int dcnt;                   // deadcount,可能没用?
    int status;                 // 进程状态,可能没用?
};
//TableChanged enum
@@ -59,118 +59,206 @@
    T_Voice = 16,       //报警声音发生变化
};
enum DbAction {
    Insert = 0,     //Insert
    Update = 1,     //Update
    Delete = 2,     //Delete
    Insert = 0,     //Insert db
    Update = 1,     //Update db
    Delete = 2,     //Delete db
};
// 数据库变化信息
struct DbChangeMsg{
    TableChanged table;     //变化的表
    struct cstr   id;      //变化数据id
    DbAction action;        //action
    DbAction action;        //action/ DbAction[Insert/Update/Delete]
    struct cstr   info;    //变化内容
};
// 订阅消息
struct csubmsg{
    struct cstr topic;
    struct cstr topic;      // 收到的订阅消息的主题,区分那种订阅消息
    // private
    // enum MsgT {NONE=0, DB, PROCLIST} type;
    struct cstr msg;
    struct cstr msg;        // 收到的订阅消息的body
};
// 其他注册进程列表,如数据库进程需要启动再运行pollcontrol逻辑
struct cproclist{
    struct  cclient* cli;
    size_t  clientsize;
    struct  cclient* cli;   // 其他进程的列表
    size_t  count;          // 其他进程的列表的count
};
//////////////////////////////////////////
// request msg
/*
    此消息既作为接收的request msg的结构,也作为发送request 消息的结构
    接收request消息会带有发出request消息的进程的id,procid
*/
struct creqmsg{
    struct cstr procid;
    struct cstr msg;
    struct cstr procid;     // 发送request消息的进程id
    struct cstr msg;        // request消息体
};
// decode stack err msg
/*
    pollcontrol会接收数据栈解码发送的解码失败消息,用于置网页状态
*/
struct cstackmsgerr{
    struct cstr stackid;
    struct cstr fileid;
    struct cstr stackid;    // 解码失败的数据栈id
    struct cstr fileid;     // 解码失败的文件id
};
// stack file
/*
    数据栈的文件列表,对应 protomsg.FileAnalysis 但是此结构体非常长
    看代码应该只需要解析出的几个
*/
struct cstackfile{
    struct cstr id;
    struct cstr name;
    struct cstr path;
    int type;
    void* noused;
    struct cstr id;         // 文件id
    struct cstr name;       // 文件name
    struct cstr path;       // 文件路径
    int type;               // 文件类型 1:video,2:picture
    void* noused;           // 未使用
};
// decode stack success msg
/*
    pollcontrol会接收数据栈解码完成的消息,然后下发新的数据栈任务
*/
struct cstackmsg{
    int procnum;
    struct cstr stackid;
    struct cstr stackname;
    int type;
    int shmkey;
    int width;
    int procnum;                // decoder 启动的进程号,数据栈可能会有数个decoder同时运行
    struct cstr stackid;        // 数据栈 id
    struct cstr stackname;      // 数据栈 name
    int type;                   // 数据栈类型 video picture
    int shmkey;                 // 数据栈使用的共享内存key
    int width;                  // 分辨率
    int height;
    int fps;
    struct cstackfile* files;
    int fps;                    // fps 采样 fps/s
    struct cstackfile* files;   // 数据栈文件列表
    size_t filescnt;
};
// reply msg
// 对应 bhome_msg.MsgRequestTopicReply
struct crepmsg{
    int errcode;
    struct cstr errmsg;
    struct cstr data;
    int errcode;                // 相应request请求的消息,错误码
    struct cstr errmsg;         // 错误消息
    struct cstr data;           // 消息体
};
#ifdef __cplusplus
extern "C"{
#endif
/*
    封装了C接口的string
    cstr_new 创建一个string,包括内存地址和长度,会拷贝参数
    必须使用cstr_free释放
*/
struct cstr cstr_new(const char* str, const size_t len);
void cstr_free(struct cstr str);
/*
    封装字符串数组,其中是一个struct cstr数组,包括指向数组的指针和count
    通过cstr_arr_add添加字符串,内部会拷贝字符串
    必须使用cstr_arr_free释放
*/
struct cstrarr cstr_arr_new(const size_t count);
void cstr_arr_add(struct cstrarr* arr, const char* data, const size_t len, const size_t idx);
void cstr_arr_free(struct cstrarr arr);
void free_creg(struct creg* reg);
struct cproc* clone_proc_info(struct cproc* pi);
/*
    创建struct cproc 结构,对应procinfo,保存proc的name,id,info[当前没有使用]
    必须使用free_proc_info释放
*/
struct cproc* make_proc_info(const struct cstr name, const struct cstr id, const struct cstr info);
/*
    从已存在的proc克隆,会拷贝,使用free_proc_info释放
*/
struct cproc* clone_proc_info(const struct cproc* pi);
void free_proc_info(struct cproc* pi);
/*
    释放creg结构指针
    creg结构可以使用上述make_proc_info、cstr_arr_new、cstr_new函数创建
*/
void free_creg(struct creg* reg);
// 订阅消息相关,订阅数据库db消息和进程列表proclist消息
/*
    cbhomeclient.cpp中使用,将接收到的submsg解包成csubmsg
    包括topic和msg数据,msg数据并未反序列化
    必须使用free_submsg释放
*/
struct csubmsg* to_submsg(const char* data, const size_t size);
void free_submsg(struct csubmsg* msg);
// db msg
/*
    接收到的submsg在pollcontrol中可能为数据库更新的通知
    将csubmsg中未反序列化的数据解包成db的数据格式
    必须用free_submsg_db释放
*/
struct DbChangeMsg* get_submsg_db(struct csubmsg* msg);
void free_submsg_db(struct DbChangeMsg* msg);
// proclist msg
/*
    接收到的submsg在pollcontrol中可能为关心的其他进程的列表
    将csubmsg中未反序列化的数据解包成进程的列表
    必须用free_submsg_proclist释放
*/
struct cproclist* get_submsg_proclist(struct csubmsg* msg);
void free_submsg_proclist(struct cproclist* ppl);
// request msg
/*
    cbhomeclient.cpp中使用,将接收到的request请求数据解包成creqmsg
    包括request进程的procid和msg数据,msg数据并未反序列化
    必须使用free_reqmsg释放
*/
struct creqmsg* to_reqmsg(const char* pid,const size_t pids,const char* data,const size_t size);
struct creqmsg* make_req_msg(const char* topic, const size_t topics,
void free_reqmsg(struct creqmsg* msg);
/*
    本进程向其他进程请求数据构建
    包括请求的主题topic和请求的数据body[data]
    必须使用free_reqmsg释放
*/
struct creqmsg* make_req_msg(const char* topic, const size_t topicl,
                                const char* data, const size_t datal);
void free_reqmsg(struct creqmsg* msg);
// decode err msg
/*
    接收到的creqmsg在pollcontrol中可能为数据栈解码错误信息
    将creqmsg中未反序列化的数据解包成cstackmsgerr,包括数据栈id stackid和文件id fileid
    必须用free_reqmsg_stackerr释放
*/
struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg);
void free_reqmsg_stackerr(struct cstackmsgerr* msg);
// decode success msg
/*
    接收到的creqmsg在pollcontrol中可能为数据栈解码完成信息
    将creqmsg中未反序列化的数据解包成cstackmsg
    包括
    int procnum;    // 启动的decoder进程num
    struct cstr stackid;
    struct cstr stackname;
    int type;   // video picture
    int shmkey;
    int width;
    int height;
    int fps;
    struct cstackfile* files; // 文件列表
    size_t filescnt; // 文件数量
    必须用free_reqmsg_stackerr释放
*/
struct cstackmsg* get_reqmsg_stack(struct creqmsg* msg);
void free_reqmsg_stack(struct cstackmsg* msg);
// reply msg
/*
    no use 将reply消息序列化为json,目前没有使用
    使用 cstr_free 释放
*/
struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl,
                                const char* data, const size_t datal);
/*
    创建 creqmsg 包括errcode、errmsg和消息体data
    使用 free_reply_msg 释放
*/
struct crepmsg* make_reply_msg(const int errcode, const char* errmsg, const size_t emsgl,
                                const char* data, const size_t datal);
void free_reply_msg(struct crepmsg* msg);
#ifdef __cplusplus