#include "message.h" #include #include "google/protobuf/descriptor.h" #include "google/protobuf/util/json_util.h" #include "google/protobuf/util/type_resolver.h" #include "google/protobuf/util/type_resolver_util.h" #include "google/protobuf/message.h" #include "google/protobuf/descriptor.h" #include "google/protobuf/descriptor.pb.h" #include "google/protobuf/dynamic_message.h" #include "google/protobuf/compiler/importer.h" #include "3rdparty/yyjson/yyjson.h" #include "bhome_msg_api.pb.h" using namespace bhome_msg; using namespace std; struct cstr{ char* str; // 字符串内容 size_t size; // 字符串长度 }; // 进程信息 struct cproc{ struct cstr name; // 进程名字 struct cstr id; // 进程id struct cstr info; // 进程其他信息,目前没用 }; struct creg{ cproc* proc; // 需要注册的进程信息 char** channel; // 进程提供的请求响应服务的主题 size_t channel_count; char** pub; // 进程提供的发布订阅的发布主题 size_t pub_count; char** sub; // 进程需要订阅的主题 size_t sub_count; char** sub_net; // 进程需要订阅的网络主题,目前没用 size_t sub_net_count; }; template T* ptrT(const size_t l=1){ return (T*)calloc(l, sizeof(T)); } template struct rmC : remove_const{}; template struct rmC{ using type = typename rmC::type*; }; template struct rmC{ using type = typename rmC::type**; }; template ::value>::type* = nullptr> typename rmC::type rmConst(T t){ return const_cast::type>(t); } static inline struct cstr cstr_ref(const char* str, const size_t len){ struct cstr cs; memset(&cs, 0, sizeof(cs)); cs.size = len; cs.str = rmConst(str); return cs; } static inline struct cstr cstr_clone(const struct cstr* str){ struct cstr cs; cs.size = str->size; cs.str = ptrT(cs.size); memcpy(cs.str, str, cs.size); return cs; } cproc* internal_clone_cproc(const cproc* proc){ auto nproc = ptrT(); nproc->name = cstr_clone(&proc->name); nproc->id = cstr_clone(&proc->id); return nproc; } void internal_cproc_free(cproc* proc){ if (proc){ free(proc->name.str); free(proc->id.str); free(proc->info.str); free(proc); } } cproc* make_cproc(const char* name, const char* id){ auto proc = ptrT(); proc->name = cstr_ref(name, strlen(name)); proc->id = cstr_ref(id, strlen(id)); return proc; } char* cproc_name(const cproc* proc){ if (!proc) return NULL; return proc->name.str; } char* cproc_id(const cproc* proc){ if (!proc) return NULL; return proc->id.str; } void cproc_free(cproc* proc){ free(proc); } creg* make_creg(const cproc* proc, const char** rep, const size_t repcnt, const char** pub, const size_t pubcnt, const char** sub, const size_t subcnt, const char** subnet, const size_t subnetcnt) { auto reg = ptrT(); reg->proc = rmConst(proc); reg->channel = rmConst(rep); reg->channel_count = repcnt; reg->pub = rmConst(pub); reg->pub_count = pubcnt; reg->sub = rmConst(sub); reg->sub_count = subcnt; reg->sub_net = rmConst(subnet); reg->sub_net_count = subnetcnt; return reg; } creg* make_creg_from_cproc(const cproc* proc){ auto reg = ptrT(); reg->proc = rmConst(proc); return reg; } static inline void creg_add_topic(char*** dst, size_t* dstC, const char** src, const size_t srcC){ *dst = rmConst(src); *dstC = srcC; } void creg_add_topic_reply(creg* reg, const char** topic, const size_t count){ creg_add_topic(®->channel, ®->channel_count, topic, count); } void creg_add_topic_pub(creg* reg, const char** topic, const size_t count){ creg_add_topic(®->pub, ®->pub_count, topic, count); } void creg_add_topic_sub(creg* reg, const char** topic, const size_t count){ creg_add_topic(®->sub, ®->sub_count, topic, count); } void creg_add_topic_subnet(creg* reg, const char** topic, const size_t count){ creg_add_topic(®->sub_net, ®->sub_net_count, topic, count); } const cproc* creg_proc(const creg* reg){ if (!reg) return NULL; return reg->proc; } char** creg_reply_topic(const creg* reg, size_t* count){ if (!reg) return NULL; *count = reg->channel_count; return reg->channel; } char** creg_pub_topic(const creg* reg, size_t* count){ if (!reg) return NULL; *count = reg->pub_count; return reg->pub; } char** creg_sub_topic(const creg* reg, size_t* count){ if (!reg) return NULL; *count = reg->sub_count; return reg->sub; } char** creg_subnet_topic(const creg* reg, size_t* count){ if (!reg) return NULL; *count = reg->sub_net_count; return reg->sub_net; } void creg_free(creg* reg){ free(reg); } ///////////////////////////////////////////////////////////////////// // suppress -unused-function warning template void ignoreT(F&&){} // MUST COMPILE *.pb.cc FILE static string toJSON(const char* data, const size_t size, const string& type){ ignoreT(toJSON); using namespace google::protobuf; using namespace google::protobuf::util; const DescriptorPool* pool = DescriptorPool::generated_pool(); string url = "basic.com"; // string url = "type.googleapis.com"; std::string message_type = url + "/protomsg." + type; util::TypeResolver* resolver = util::NewTypeResolverForDescriptorPool(url, pool); util::JsonOptions opt; string in{(const char*)data, size}; string out{}; Status result = BinaryToJsonString(resolver, message_type, in, &out, opt); if (!result.ok()) printf("error msg %s\n", result.message().data()); delete resolver; // printf("json to image %s\n", out.c_str()); return out; } static struct DbChangeMsg* json2DbChangeMsg(const string& json){ ignoreT(json2DbChangeMsg); auto msg = ptrT(); yyjson_doc* doc = yyjson_read(json.data(), json.size(), 0); yyjson_val* root = yyjson_doc_get_root(doc); auto obj = [root](const char* name){return yyjson_obj_get(root, name);}; auto assign = [&obj](const char* name, char** d, size_t* l){ auto v = obj(name); *l = yyjson_get_len(v); *d = ptrT(*l+1); memcpy(*d, yyjson_get_str(v), *l); }; msg->table = (TableChanged)yyjson_get_int(obj("table")); msg->action = (DbAction)yyjson_get_int(obj("action")); assign("id", &msg->id, &msg->idl); assign("info", &msg->info, &msg->infol); yyjson_doc_free(doc); return msg; } static inline void freeDbChangeMsg(struct DbChangeMsg* msg){ if (msg){ free(msg->id); free(msg->info); free(msg); } } /////////////////////////////////////////////////////////////// static inline void json2str(yyjson_val* v, char** d, size_t* l){ *l = yyjson_get_len(v); *d = ptrT(*l+1); memcpy(*d, yyjson_get_str(v), *l); } static struct creg* json2creg(yyjson_val* v){ auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);}; auto rinfo = ptrT(); auto pval = obj(v, "proc"); auto pinfo = ptrT(); json2str(obj(pval, "name"), &pinfo->name.str, &pinfo->name.size); json2str(obj(pval, "id"), &pinfo->id.str, &pinfo->id.size); json2str(obj(pval, "info"), &pinfo->info.str, &pinfo->info.size); rinfo->proc = pinfo; auto assign_arr = [&obj](yyjson_val* v){ const size_t count = yyjson_arr_size(v); char** arr = ptrT(count); for(size_t i = 0; i < count; i++){ auto sv = yyjson_arr_get(v, i); char* entry = NULL; size_t entry_size = 0; json2str(sv, &entry, &entry_size); arr[i] = entry; } return make_tuple(arr, count); }; tie(rinfo->channel, rinfo->channel_count) = assign_arr(obj(v, "channel")); tie(rinfo->pub, rinfo->pub_count) = assign_arr(obj(v, "pubTopic")); tie(rinfo->sub, rinfo->sub_count) = assign_arr(obj(v, "subTopic")); tie(rinfo->sub_net, rinfo->sub_net_count) = assign_arr(obj(v, "subNetTopic")); return rinfo; } static inline void free_cclients(struct cclient* cli, const size_t count){ for(size_t i = 0; i < count; i++) { if (cli[i].rinfo) { auto tmp = cli[i].rinfo; free(tmp->proc); auto free_arr = [](char** arr, const size_t count){ for(size_t i = 0; i < count; i++){ free(arr[i]); } free(arr); }; free_arr(tmp->channel, tmp->channel_count); free_arr(tmp->pub, tmp->pub_count); free_arr(tmp->sub, tmp->sub_count); free_arr(tmp->sub_net, tmp->sub_net_count); }; } free(cli); } static struct cclient* json2cclients(const char* data, const size_t size, size_t* len){ yyjson_doc* doc = yyjson_read(data, size, 0); yyjson_val* root = yyjson_doc_get_root(doc); *len = yyjson_arr_size(root); auto cli = ptrT(*len); auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);}; for(size_t i = 0; i < *len; i++){ cclient rc; yyjson_val* one = yyjson_arr_get(root, i); rc.replykey = yyjson_get_int(obj(one, "replyKey")); rc.hbcnt = yyjson_get_int(obj(one, "heartbeatCount")); rc.dcnt = yyjson_get_int(obj(one, "deadCount")); rc.status = yyjson_get_int(obj(one, "status")); rc.rinfo = json2creg(obj(one, "info")); cli[i] = rc; } yyjson_doc_free(doc); return cli; } //////////////////////////////////////////////////////////////// static struct DbChangeMsg* pb2DbChangeMsg(const char* data, const size_t len){ string pbstr = R"( syntax = "proto3"; package protomsg; //TableChanged enum enum TableChanged { T_Camera = 0;//摄像机变化 T_CameraRule = 1;//摄像机任务参数变化 T_Sdk = 2;//sdk变化 T_CameraPolygon = 3;//摄像机多边形变化 T_TimeRule = 4;//时间规则变化 T_Server = 5;//服务器信息变化 T_PollConfig = 6;//轮询配置变化 T_File = 7;//本地文件变化 T_FileSetting = 8;//本地文件分析设置 T_SdkChanSet = 9;//算法通道设置变化 T_FileStack = 10;//数据栈变化 T_ResourceConfig = 11;//对外服务配置变化 T_CalculationPower = 12;//需要重新计算算力占用情况 T_EventPush = 13; //事件推送有变化 T_Cluster = 14; //集群创建、加入或退出 T_CameraPolygonRelation = 15; //摄像机区域的关联关系 T_Voice = 16;//报警声音发生变化 } enum DbAction { Insert = 0;//Insert Update = 1;//Update Delete = 2;//Delete } //publish db change message message DbChangeMessage { TableChanged table = 1;//变化的表 string id = 2;//变化数据id DbAction action = 3;//action string info = 4;//变化内容 } )"; string filename = to_string(random()) + "_" + to_string(random()) + "db.proto"; FILE* fp = fopen(filename.c_str(), "wb"); size_t pos = 0; while(pos != pbstr.size()){ pos += fwrite(pbstr.data()+pos, 1, pbstr.size()-pos, fp); } fclose(fp); using namespace google::protobuf; compiler::DiskSourceTree ds; ds.MapPath("", "."); compiler::Importer imp(&ds, NULL); imp.Import(filename); remove(filename.c_str()); auto desc = imp.pool()->FindMessageTypeByName("protomsg.DbChangeMessage"); DynamicMessageFactory factory; auto message = factory.GetPrototype(desc); auto msg = message->New(); msg->ParseFromArray(data, len); // printf("debug string %s\n", msg->DebugString().c_str()); auto db = ptrT(); auto reflection = msg->GetReflection(); db->table = (TableChanged)reflection->GetEnumValue(*msg, desc->FindFieldByName("table")); db->action = (DbAction)reflection->GetEnumValue(*msg, desc->FindFieldByName("action")); auto assign = [reflection,desc,msg](const char* name, char** d, size_t* l){ *l = 0; *d = NULL; auto s = reflection->GetString(*msg, desc->FindFieldByName(name)); if (s.empty()) return; *l = s.size(); *d = ptrT(s.size()+1); memcpy(*d, s.data(), *l); }; assign("id", &db->id, &db->idl); assign("info", &db->info, &db->infol); // printf("table %d id %s act %d info %s\n", tbl, id.c_str(), act, info.c_str()); // string json; // util::MessageToJsonString(*msg, &json); // printf("json string %s\n", json.c_str()); delete msg; return db; } static tuple copymemory(const char* src, const size_t size){ char* dst = ptrT(size + 1); memcpy(dst, src, size); return make_tuple(dst, size); } static tuple copymemory(const string& src){ char* dst = ptrT(src.size() + 1); memcpy(dst, src.data(), src.size()); return make_tuple(dst, src.size()); } struct csubmsg* to_submsg(const char* data, const size_t size){ MsgPublish msg; if (!msg.ParseFromArray(data, size)) return NULL; auto smsg = ptrT(); tie(smsg->topic, smsg->topicl) = copymemory(msg.topic()); tie(smsg->msg, smsg->msgl) = copymemory(msg.data()); return smsg; } void free_submsg(struct csubmsg* msg){ if (msg){ free(msg->topic); free(msg->msg); free(msg); } } struct DbChangeMsg* get_submsg_db(struct csubmsg* msg){ // string json = toJSON((const char*)msg->data, msg->size, "DbChangeMessage"); // return json2DbChangeMsg(json); return pb2DbChangeMsg((const char*)msg->msg, msg->msgl); } void free_submsg_db(struct DbChangeMsg* msg){ if (msg) freeDbChangeMsg(msg); } struct cproclist* get_submsg_proclist(struct csubmsg* msg){ const char* data = msg->msg; const size_t size = msg->msgl; auto pl = ptrT(); pl->cli = json2cclients(data, size, &pl->count); return pl; } void free_submsg_proclist(struct cproclist* pl){ if (pl) if (pl->cli) free_cclients(pl->cli, pl->count); free(pl); } ////////////////////////////////////////////////// struct creqmsg* to_reqmsg(const char* pid, const size_t pids, const char* data, const size_t size) { auto msg = ptrT(); tie(msg->procid, msg->procidl) = copymemory(pid, pids); MsgRequestTopic msgRT; if (!msgRT.ParseFromArray(data, size)) { free(msg->procid); free(msg); return NULL; } tie(msg->msg, msg->msgl) = copymemory(msgRT.data()); return msg; } struct creqmsg* make_req_msg(const char* topic, const size_t topicl, const char* data, const size_t datal) { auto msg = ptrT(); MsgRequestTopic msgRT; msgRT.set_topic(topic, topicl); msgRT.set_data(data, datal); const auto& pbstr = msgRT.SerializeAsString(); tie(msg->msg, msg->msgl) = copymemory(pbstr); return msg; } void free_reqmsg(struct creqmsg* msg){ if (msg){ free(msg->procid); free(msg->msg); free(msg); } } static tuple json2body(yyjson_doc* doc){ yyjson_val* root = yyjson_doc_get_root(doc); auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);}; auto jp = obj(root, "path"); if (!jp) return make_tuple((const char*)NULL, 0); auto jb = obj(root, "body"); if (!jb) return make_tuple((const char*)NULL, 0); return make_tuple(yyjson_get_str(jb), yyjson_get_len(jp)); } struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg){ auto doc0 = yyjson_read(msg->msg, msg->msgl, 0); auto tpbody = json2body(doc0); if (!get<0>(tpbody)) return NULL; auto doc = yyjson_read(get<0>(tpbody), get<1>(tpbody), 0); auto root = yyjson_doc_get_root(doc); auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);}; auto jsid = obj(root, "stackId"); auto jfid = obj(root, "fileId"); auto smsg = ptrT(); tie(smsg->stackid, smsg->stackidl) = copymemory(yyjson_get_str(jsid), yyjson_get_len(jsid)); tie(smsg->fileid, smsg->fileidl) = copymemory(yyjson_get_str(jfid), yyjson_get_len(jfid)); yyjson_doc_free(doc); yyjson_doc_free(doc0); return smsg; } void free_reqmsg_stackerr(struct cstackmsgerr* msg){ if (msg){ free(msg->stackid); free(msg->fileid); free(msg); } } // decode success msg struct cstackmsg* get_reqmsg_stack(struct creqmsg* msg){ auto doc0 = yyjson_read(msg->msg, msg->msgl, 0); auto tpbody = json2body(doc0); if (!get<0>(tpbody)) return NULL; auto doc = yyjson_read(get<0>(tpbody), get<1>(tpbody), 0); auto root = yyjson_doc_get_root(doc); auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);}; auto assign = [&obj](yyjson_val* v, const char* name){ auto tmp(obj(v, name)); return copymemory(yyjson_get_str(tmp), yyjson_get_len(tmp)); }; auto smsg = ptrT(); smsg->procnum = yyjson_get_int(obj(root, "procNum")); tie(smsg->stackid, smsg->stackidl) = assign(root, "stackId"); tie(smsg->stackname, smsg->stacknamel) = assign(root, "stackName"); smsg->type = yyjson_get_int(obj(root, "type")); smsg->width = yyjson_get_int(obj(root, "resolution_width")); smsg->width = yyjson_get_int(obj(root, "resolution_height")); auto arr = obj(root, "fileArr"); smsg->filescnt = yyjson_arr_size(arr); smsg->files = ptrT(smsg->filescnt); for(size_t i = 0; i < smsg->filescnt; i++){ struct cstackfile file; memset(&file, 0, sizeof(file)); yyjson_val* one = yyjson_arr_get(arr, i); tie(file.id, file.idl) = assign(one, "id"); tie(file.name, file.namel) = assign(one, "name"); tie(file.path, file.pathl) = assign(one, "path"); file.type = yyjson_get_int(obj(one, "type")); smsg->files[i] = file; } yyjson_doc_free(doc); yyjson_doc_free(doc0); return smsg; } void free_reqmsg_stack(struct cstackmsg* msg){ if (msg){ free(msg->stackid); free(msg->stackname); for(size_t i = 0; i < msg->filescnt; i++){ free(msg->files[i].id); free(msg->files[i].name); free(msg->files[i].path); } free(msg); } } struct crepmsg* make_reply_msg(const int errcode, const char* errmsg, const size_t emsgl, const char* data, const size_t datal) { auto msg = ptrT(); msg->errcode = errcode; tie(msg->errmsg, msg->errmsgl) = copymemory(errmsg, emsgl); tie(msg->data, msg->datal) = copymemory(data, datal); return msg; } void free_reply_msg(struct crepmsg* msg){ if (msg){ free(msg->errmsg); free(msg->data); free(msg); } } template void ignoref(F&& f){} static struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl, const char* data, const size_t datal) { ignoref(make_reply_msg_json); auto doc = yyjson_mut_doc_new(NULL); auto root = yyjson_mut_obj(doc); yyjson_mut_obj_add_bool(doc, root, "success", !!success); yyjson_mut_obj_add_strn(doc, root, "msg", msg, msgl); yyjson_mut_obj_add_strn(doc, root, "data", data, datal); size_t jsonl = 0; char* json = yyjson_mut_val_write(root, 0, &jsonl); yyjson_mut_doc_free(doc); return cstr_ref(json, jsonl); } void free_query_procs(struct cqueryprocs* procs, const size_t count){ for(size_t i = 0; i < count; i++){ free(procs[i].id); } free(procs); }