#include "message.h"
|
|
#include <tuple>
|
|
#include "google/protobuf/descriptor.h"
|
#include "google/protobuf/util/json_util.h"
|
#include "google/protobuf/util/type_resolver.h"
|
#include "google/protobuf/util/type_resolver_util.h"
|
|
#include "google/protobuf/message.h"
|
#include "google/protobuf/descriptor.h"
|
#include "google/protobuf/descriptor.pb.h"
|
#include "google/protobuf/dynamic_message.h"
|
#include "google/protobuf/compiler/importer.h"
|
|
#include "3dparty/yyjson/yyjson.h"
|
|
#include "bhome_msg_api.pb.h"
|
using namespace bhome_msg;
|
|
using namespace std;
|
|
template <class T> T* ptrT(const size_t l=1){ return (T*)calloc(l, sizeof(T)); }
|
|
static struct cstr cstr_clone(const struct cstr old){
|
return cstr_new(old.str, old.size);
|
}
|
static struct cstr cstr_ref(char* str, const size_t len){
|
struct cstr cs;
|
memset(&cs, 0, sizeof(cs));
|
cs.size = len;
|
cs.str = str;
|
return cs;
|
}
|
struct cstr cstr_new(const char* str, const size_t len){
|
struct cstr cs;
|
memset(&cs, 0, sizeof(cs));
|
cs.size = len;
|
cs.str = ptrT<char>(len);
|
memcpy(cs.str, str, len);
|
return cs;
|
}
|
void cstr_free(struct cstr str){
|
if (str.str && str.size) free(str.str);
|
}
|
struct cstrarr cstr_arr_new(const size_t count){
|
struct cstrarr arr;
|
memset(&arr, 0, sizeof(arr));
|
arr.arr = (struct cstr*)calloc(count, sizeof(struct cstr));
|
arr.count = count;
|
return arr;
|
}
|
void cstr_arr_add(struct cstrarr* arr, const char* data, const size_t len, const size_t idx){
|
if (arr->arr && arr->count && idx < arr->count){
|
arr->arr[idx] = cstr_new(data, len);
|
}
|
}
|
void cstr_arr_free(struct cstrarr arr){
|
for(size_t i = 0; i < arr.count; i++){
|
auto & str = arr.arr[i];
|
cstr_free(str);
|
}
|
free(arr.arr);
|
}
|
|
void free_creg(struct creg* reg){
|
if (reg){
|
if (reg->pinfo) free_proc_info(reg->pinfo);
|
cstr_arr_free(reg->channel);
|
cstr_arr_free(reg->topic_pub);
|
cstr_arr_free(reg->topic_sub);
|
cstr_arr_free(reg->topic_sub_net);
|
free(reg);
|
}
|
}
|
|
struct cproc* clone_proc_info(struct cproc* pi){
|
if (!pi) return NULL;
|
auto newpi = ptrT<struct cproc>();
|
*newpi = *pi;
|
newpi->name = cstr_clone(newpi->name);
|
newpi->id = cstr_clone(newpi->id);
|
// todo: ignore info
|
|
return newpi;
|
}
|
|
void free_proc_info(struct cproc* pi){
|
if (pi){
|
cstr_free(pi->name);
|
cstr_free(pi->id);
|
cstr_free(pi->info);
|
free(pi);
|
}
|
}
|
|
/////////////////////////////////////////////////////////////////////
|
|
// suppress -unused-function warning
|
template<class F> void ignoreT(F&&){}
|
// MUST COMPILE *.pb.cc FILE
|
static string toJSON(const char* data, const size_t size, const string& type){
|
ignoreT(toJSON);
|
|
using namespace google::protobuf;
|
using namespace google::protobuf::util;
|
const DescriptorPool* pool = DescriptorPool::generated_pool();
|
string url = "basic.com";
|
// string url = "type.googleapis.com";
|
std::string message_type = url + "/protomsg." + type;
|
util::TypeResolver* resolver =
|
util::NewTypeResolverForDescriptorPool(url, pool);
|
|
util::JsonOptions opt;
|
string in{(const char*)data, size};
|
string out{};
|
Status result =
|
BinaryToJsonString(resolver, message_type, in, &out, opt);
|
|
if (!result.ok())
|
printf("error msg %s\n", result.message().data());
|
delete resolver;
|
|
// printf("json to image %s\n", out.c_str());
|
return out;
|
}
|
static struct DbChangeMsg* json2DbChangeMsg(const string& json){
|
ignoreT(json2DbChangeMsg);
|
|
auto msg = ptrT<struct DbChangeMsg>();
|
|
yyjson_doc* doc = yyjson_read(json.data(), json.size(), 0);
|
yyjson_val* root = yyjson_doc_get_root(doc);
|
auto obj = [root](const char* name){return yyjson_obj_get(root, name);};
|
|
auto assign = [&obj](const char* name, char** d, size_t* l){
|
auto v = obj(name);
|
*l = yyjson_get_len(v);
|
*d = ptrT<char>(*l+1);
|
memcpy(*d, yyjson_get_str(v), *l);
|
};
|
msg->table = (TableChanged)yyjson_get_int(obj("table"));
|
msg->action = (DbAction)yyjson_get_int(obj("action"));
|
assign("id", &msg->id.str, &msg->id.size);
|
assign("info", &msg->info.str, &msg->info.size);
|
yyjson_doc_free(doc);
|
|
return msg;
|
}
|
static inline void freeDbChangeMsg(struct DbChangeMsg* msg){
|
if (msg){
|
cstr_free(msg->id);
|
cstr_free(msg->info);
|
free(msg);
|
}
|
}
|
|
///////////////////////////////////////////////////////////////
|
|
static inline tuple<char*, size_t> parseTopic(MsgPublish &msgp, const char* data, const size_t size){
|
if (!msgp.ParseFromArray(data, size)) return make_tuple(nullptr, 0);
|
auto tpc = ptrT<char>(msgp.topic().length()+1);
|
memcpy(tpc, msgp.topic().data(), msgp.topic().size());
|
return make_tuple(tpc, msgp.topic().size());
|
}
|
|
static inline void json2str(yyjson_val* v, char** d, size_t* l){
|
*l = yyjson_get_len(v);
|
*d = ptrT<char>(*l+1);
|
memcpy(*d, yyjson_get_str(v), *l);
|
}
|
|
static struct creg* jsonval2creg(yyjson_val* v){
|
auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
|
auto rinfo = ptrT<struct creg>();
|
|
auto pval = obj(v, "proc");
|
auto pinfo = ptrT<struct cproc>();
|
json2str(obj(pval, "name"), &pinfo->name.str, &pinfo->name.size);
|
json2str(obj(pval, "id"), &pinfo->id.str, &pinfo->id.size);
|
json2str(obj(pval, "info"), &pinfo->info.str, &pinfo->info.size);
|
rinfo->pinfo = pinfo;
|
|
auto assign_arr = [&obj](yyjson_val* v){
|
const size_t count = yyjson_arr_size(v);
|
struct cstrarr arr;
|
memset(&arr, 0, sizeof(arr));
|
arr.count = count;
|
for(size_t i = 0; i < count; i++){
|
auto sv = yyjson_arr_get(v, i);
|
char* entry = NULL;
|
size_t entry_size = 0;
|
json2str(sv, &entry, &entry_size);
|
arr.arr[i] = cstr_new(entry, entry_size);
|
}
|
return arr;
|
};
|
|
rinfo->channel = assign_arr(obj(v, "channel"));
|
rinfo->topic_pub = assign_arr(obj(v, "pubTopic"));
|
rinfo->topic_sub = assign_arr(obj(v, "subTopic"));
|
rinfo->topic_sub_net = assign_arr(obj(v, "subNetTopic"));
|
|
return rinfo;
|
}
|
|
static inline void free_cclients(struct cclient* cli, const size_t size){
|
for(size_t i = 0; i < size; i++) if (cli[i].rinfo) free_creg(cli[i].rinfo);
|
free(cli);
|
}
|
|
static struct cclient* json2cclients(const char* data, const size_t size, size_t* len){
|
yyjson_doc* doc = yyjson_read(data, size, 0);
|
yyjson_val* root = yyjson_doc_get_root(doc);
|
*len = yyjson_arr_size(root);
|
auto cli = ptrT<struct cclient>(*len);
|
|
auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
|
|
for(size_t i = 0; i < *len; i++){
|
cclient rc;
|
yyjson_val* one = yyjson_arr_get(root, i);
|
rc.replykey = yyjson_get_int(obj(one, "replyKey"));
|
rc.hbcnt = yyjson_get_int(obj(one, "heartbeatCount"));
|
rc.dcnt = yyjson_get_int(obj(one, "deadCount"));
|
rc.status = yyjson_get_int(obj(one, "status"));
|
rc.rinfo = jsonval2creg(obj(one, "info"));
|
|
cli[i] = rc;
|
}
|
yyjson_doc_free(doc);
|
return cli;
|
}
|
|
////////////////////////////////////////////////////////////////
|
|
static struct DbChangeMsg* pb2DbChangeMsg(const char* data, const size_t len){
|
string pbstr = R"(
|
syntax = "proto3";
|
package protomsg;
|
//TableChanged enum
|
enum TableChanged {
|
T_Camera = 0;//摄像机变化
|
T_CameraRule = 1;//摄像机任务参数变化
|
T_Sdk = 2;//sdk变化
|
T_CameraPolygon = 3;//摄像机多边形变化
|
T_TimeRule = 4;//时间规则变化
|
T_Server = 5;//服务器信息变化
|
T_PollConfig = 6;//轮询配置变化
|
T_File = 7;//本地文件变化
|
T_FileSetting = 8;//本地文件分析设置
|
T_SdkChanSet = 9;//算法通道设置变化
|
T_FileStack = 10;//数据栈变化
|
T_ResourceConfig = 11;//对外服务配置变化
|
T_CalculationPower = 12;//需要重新计算算力占用情况
|
T_EventPush = 13; //事件推送有变化
|
T_Cluster = 14; //集群创建、加入或退出
|
T_CameraPolygonRelation = 15; //摄像机区域的关联关系
|
T_Voice = 16;//报警声音发生变化
|
}
|
enum DbAction {
|
Insert = 0;//Insert
|
Update = 1;//Update
|
Delete = 2;//Delete
|
}
|
//publish db change message
|
message DbChangeMessage {
|
TableChanged table = 1;//变化的表
|
string id = 2;//变化数据id
|
DbAction action = 3;//action
|
string info = 4;//变化内容
|
}
|
)";
|
|
string filename = to_string(random()) + "_" + to_string(random()) + "db.proto";
|
FILE* fp = fopen(filename.c_str(), "wb");
|
size_t pos = 0;
|
while(pos != pbstr.size()){
|
pos += fwrite(pbstr.data()+pos, 1, pbstr.size()-pos, fp);
|
}
|
fclose(fp);
|
|
using namespace google::protobuf;
|
|
compiler::DiskSourceTree ds;
|
ds.MapPath("", ".");
|
compiler::Importer imp(&ds, NULL);
|
imp.Import(filename);
|
remove(filename.c_str());
|
auto desc = imp.pool()->FindMessageTypeByName("protomsg.DbChangeMessage");
|
DynamicMessageFactory factory;
|
auto message = factory.GetPrototype(desc);
|
|
auto msg = message->New();
|
|
msg->ParseFromArray(data, len);
|
// printf("debug string %s\n", msg->DebugString().c_str());
|
|
auto db = ptrT<struct DbChangeMsg>();
|
auto reflection = msg->GetReflection();
|
db->table = (TableChanged)reflection->GetEnumValue(*msg, desc->FindFieldByName("table"));
|
db->action = (DbAction)reflection->GetEnumValue(*msg, desc->FindFieldByName("action"));
|
|
auto assign = [reflection,desc,msg](const char* name, char** d, size_t* l){
|
*l = 0; *d = NULL;
|
auto s = reflection->GetString(*msg, desc->FindFieldByName(name));
|
if (s.empty()) return;
|
*l = s.size();
|
*d = ptrT<char>(s.size()+1);
|
memcpy(*d, s.data(), *l);
|
};
|
assign("id", &db->id.str, &db->id.size);
|
assign("info", &db->info.str, &db->info.size);
|
// printf("table %d id %s act %d info %s\n", tbl, id.c_str(), act, info.c_str());
|
|
// string json;
|
// util::MessageToJsonString(*msg, &json);
|
// printf("json string %s\n", json.c_str());
|
delete msg;
|
|
return db;
|
}
|
|
struct csubmsg* to_submsg(const char* data, const size_t size){
|
MsgPublish msg;
|
auto tp = parseTopic(msg, data, size);
|
|
auto pmsg = ptrT<struct csubmsg>();
|
pmsg->topic = cstr_new(get<0>(tp), get<1>(tp));
|
|
pmsg->msg = cstr_new(msg.data().data(), msg.data().size());
|
|
return pmsg;
|
}
|
|
void free_submsg(struct csubmsg* msg){
|
if (msg){
|
cstr_free(msg->topic);
|
cstr_free(msg->msg);
|
free(msg);
|
}
|
}
|
|
struct DbChangeMsg* get_submsg_db(struct csubmsg* msg){
|
// string json = toJSON((const char*)msg->data, msg->size, "DbChangeMessage");
|
// return json2DbChangeMsg(json);
|
return pb2DbChangeMsg((const char*)msg->msg.str, msg->msg.size);
|
}
|
void free_submsg_db(struct DbChangeMsg* msg){
|
if (msg) freeDbChangeMsg(msg);
|
}
|
|
struct cproclist* get_submsg_proclist(struct csubmsg* msg){
|
const char* data = msg->msg.str;
|
const size_t size = msg->msg.size;
|
|
auto pl = ptrT<struct cproclist>();
|
pl->cli = json2cclients(data, size, &pl->clientsize);
|
return pl;
|
}
|
void free_submsg_proclist(struct cproclist* pl){
|
if (pl) if (pl->cli) free_cclients(pl->cli, pl->clientsize);
|
}
|
|
//////////////////////////////////////////////////
|
static tuple<struct cstr, struct cstr> json2reqmsg(const struct cstr msg){
|
yyjson_doc* doc = yyjson_read(msg.str, msg.size, 0);
|
yyjson_val* root = yyjson_doc_get_root(doc);
|
|
auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
|
|
auto jp = obj(root, "path");
|
auto jb = obj(root, "body");
|
auto tp = make_tuple(cstr_new(yyjson_get_str(jp), yyjson_get_len(jp)),
|
cstr_new(yyjson_get_raw(jb), yyjson_get_len(jb)));
|
yyjson_doc_free(doc);
|
return tp;
|
}
|
|
struct creqmsg* to_reqmsg(const char* pid, const size_t pids, const char* data, const size_t size)
|
{
|
auto msg = ptrT<struct creqmsg>();
|
msg->procid = cstr_new(pid, pids);
|
MsgRequestTopic msgRT;
|
if (!msgRT.ParseFromArray(data, size)) return NULL;
|
msg->msg = cstr_new(msgRT.data().data(), msgRT.data().size());
|
|
return msg;
|
}
|
|
struct creqmsg* make_req_msg(const char* topic, const size_t topics,
|
const char* data, const size_t datal)
|
{
|
auto msg = ptrT<struct creqmsg>();
|
MsgRequestTopic msgRT;
|
msgRT.set_topic(topic, topics);
|
msgRT.set_data(data, datal);
|
|
auto pbstr = msgRT.SerializeAsString();
|
msg->msg = cstr_new(pbstr.data(), pbstr.size());
|
|
return msg;
|
}
|
|
void free_reqmsg(struct creqmsg* msg){
|
if (msg){
|
cstr_free(msg->procid);
|
cstr_free(msg->msg);
|
free(msg);
|
}
|
}
|
|
struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg){
|
struct cstr path, body;
|
memset(&path, 0, sizeof(path));
|
memset(&body, 0, sizeof(body));
|
tie(path, body) = json2reqmsg(msg->msg);
|
if (body.size == 0) return NULL;
|
|
auto doc = yyjson_read(body.str, body.size, 0);
|
auto root = yyjson_doc_get_root(doc);
|
|
auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
|
|
auto jsid = obj(root, "stackId");
|
auto jfid = obj(root, "fileId");
|
auto stkmsg = ptrT<struct cstackmsgerr>();
|
stkmsg->stackid = cstr_new(yyjson_get_str(jsid), yyjson_get_len(jsid));
|
stkmsg->fileid = cstr_new(yyjson_get_str(jfid), yyjson_get_len(jfid));
|
yyjson_doc_free(doc);
|
|
cstr_free(path);
|
cstr_free(body);
|
|
return stkmsg;
|
}
|
void free_reqmsg_stackerr(struct cstackmsgerr* msg){
|
if (msg){
|
cstr_free(msg->stackid);
|
cstr_free(msg->fileid);
|
free(msg);
|
}
|
}
|
// decode success msg
|
struct cstackmsg* get_reqmsg_stack(struct creqmsg* msg){
|
struct cstr path, body;
|
memset(&path, 0, sizeof(path));
|
memset(&body, 0, sizeof(body));
|
tie(path, body) = json2reqmsg(msg->msg);
|
if (body.size == 0) return NULL;
|
|
auto doc = yyjson_read(body.str, body.size, 0);
|
auto root = yyjson_doc_get_root(doc);
|
|
auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
|
auto assign = [&obj](yyjson_val* v, const char* name){
|
auto tmp(obj(v, name));
|
return cstr_new(yyjson_get_str(tmp), yyjson_get_len(tmp));
|
};
|
auto smsg = ptrT<struct cstackmsg>();
|
smsg->procnum = yyjson_get_int(obj(root, "procNum"));
|
smsg->stackid = assign(root, "stackId");
|
smsg->stackname = assign(root, "stackName");
|
smsg->type = yyjson_get_int(obj(root, "type"));
|
smsg->width = yyjson_get_int(obj(root, "resolution_width"));
|
smsg->width = yyjson_get_int(obj(root, "resolution_height"));
|
|
auto arr = obj(root, "fileArr");
|
smsg->filescnt = yyjson_arr_size(arr);
|
smsg->files = ptrT<struct cstackfile>(smsg->filescnt);
|
|
for(size_t i = 0; i < smsg->filescnt; i++){
|
struct cstackfile file;
|
memset(&file, 0, sizeof(file));
|
yyjson_val* one = yyjson_arr_get(arr, i);
|
file.id = assign(one, "id");
|
file.name = assign(one, "name");
|
file.path = assign(one, "path");
|
file.type = yyjson_get_int(obj(one, "type"));
|
|
smsg->files[i] = file;
|
}
|
yyjson_doc_free(doc);
|
|
return smsg;
|
}
|
void free_reqmsg_stack(struct cstackmsg* msg){
|
if (msg){
|
cstr_free(msg->stackid);
|
cstr_free(msg->stackname);
|
for(size_t i = 0; i < msg->filescnt; i++){
|
cstr_free(msg->files[i].id);
|
cstr_free(msg->files[i].name);
|
cstr_free(msg->files[i].path);
|
}
|
free(msg);
|
}
|
}
|
|
struct crepmsg* make_reply_msg(const int errcode, const char* errmsg, const size_t emsgl,
|
const char* data, const size_t datal)
|
{
|
auto msg = ptrT<struct crepmsg>();
|
msg->errcode = errcode;
|
msg->errmsg = cstr_new(errmsg, emsgl);
|
msg->data = cstr_new(data, datal);
|
return msg;
|
}
|
|
struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl,
|
const char* data, const size_t datal)
|
{
|
auto doc = yyjson_mut_doc_new(NULL);
|
auto root = yyjson_mut_obj(doc);
|
yyjson_mut_obj_add_bool(doc, root, "success", !!success);
|
yyjson_mut_obj_add_strn(doc, root, "msg", msg, msgl);
|
yyjson_mut_obj_add_strn(doc, root, "data", data, datal);
|
|
size_t jsonl = 0;
|
char* json = yyjson_mut_val_write(root, 0, &jsonl);
|
yyjson_mut_doc_free(doc);
|
return cstr_ref(json, jsonl);
|
}
|
|
void free_reply_msg(struct crepmsg* msg){
|
if (msg){
|
cstr_free(msg->errmsg);
|
cstr_free(msg->data);
|
free(msg);
|
}
|
}
|