#include "message.h" #include #include "google/protobuf/descriptor.h" #include "google/protobuf/util/json_util.h" #include "google/protobuf/util/type_resolver.h" #include "google/protobuf/util/type_resolver_util.h" #include "google/protobuf/message.h" #include "google/protobuf/descriptor.h" #include "google/protobuf/descriptor.pb.h" #include "google/protobuf/dynamic_message.h" #include "google/protobuf/compiler/importer.h" #include "3dparty/yyjson/yyjson.h" #include "bhome_msg_api.pb.h" using namespace bhome_msg; using namespace std; template T* ptrT(const size_t l=1){ return (T*)calloc(l, sizeof(T)); } static inline struct cstr null_cstr(){ struct cstr cs; memset(&cs, 0, sizeof(cs)); return cs; } static inline struct cstr cstr_ref(char* str, const size_t len){ struct cstr cs = null_cstr(); cs.size = len; cs.str = str; return cs; } static inline struct cstr cstr_clone(const struct cstr old){ return cstr_new(old.str, old.size); } struct cstr cstr_new(const char* str, const size_t len){ struct cstr cs = null_cstr(); cs.size = len; cs.str = ptrT(len); memcpy(cs.str, str, len); return cs; } void cstr_free(struct cstr str){ if (str.str && str.size) free(str.str); } static inline struct cstrarr null_cstr_arr(){ struct cstrarr arr; memset(&arr, 0, sizeof(arr)); return arr; } struct cstrarr cstr_arr_new(const size_t count){ struct cstrarr arr = null_cstr_arr(); arr.arr = (struct cstr*)calloc(count, sizeof(struct cstr)); arr.count = count; return arr; } void cstr_arr_add(struct cstrarr* arr, const char* data, const size_t len, const size_t idx){ if (arr->arr && arr->count && idx < arr->count){ arr->arr[idx] = cstr_new(data, len); } } void cstr_arr_free(struct cstrarr arr){ for(size_t i = 0; i < arr.count; i++){ auto & str = arr.arr[i]; cstr_free(str); } free(arr.arr); } struct cproc* make_proc_info(const struct cstr name, const struct cstr id, const struct cstr info){ auto proc = ptrT(); proc->name = cstr_clone(name); proc->id = cstr_clone(id); // proc->info = cstr_clone(info); return proc; } struct cproc* clone_proc_info(const struct cproc* pi){ if (!pi) return NULL; auto newpi = ptrT(); newpi->name = cstr_clone(newpi->name); newpi->id = cstr_clone(newpi->id); // todo: ignore info return newpi; } void free_proc_info(struct cproc* pi){ if (pi){ cstr_free(pi->name); cstr_free(pi->id); cstr_free(pi->info); free(pi); } } void free_creg(struct creg* reg){ if (reg){ if (reg->pinfo) free_proc_info(reg->pinfo); cstr_arr_free(reg->channel); cstr_arr_free(reg->topic_pub); cstr_arr_free(reg->topic_sub); cstr_arr_free(reg->topic_sub_net); free(reg); } } ///////////////////////////////////////////////////////////////////// // suppress -unused-function warning template void ignoreT(F&&){} // MUST COMPILE *.pb.cc FILE static string toJSON(const char* data, const size_t size, const string& type){ ignoreT(toJSON); using namespace google::protobuf; using namespace google::protobuf::util; const DescriptorPool* pool = DescriptorPool::generated_pool(); string url = "basic.com"; // string url = "type.googleapis.com"; std::string message_type = url + "/protomsg." + type; util::TypeResolver* resolver = util::NewTypeResolverForDescriptorPool(url, pool); util::JsonOptions opt; string in{(const char*)data, size}; string out{}; Status result = BinaryToJsonString(resolver, message_type, in, &out, opt); if (!result.ok()) printf("error msg %s\n", result.message().data()); delete resolver; // printf("json to image %s\n", out.c_str()); return out; } static struct DbChangeMsg* json2DbChangeMsg(const string& json){ ignoreT(json2DbChangeMsg); auto msg = ptrT(); yyjson_doc* doc = yyjson_read(json.data(), json.size(), 0); yyjson_val* root = yyjson_doc_get_root(doc); auto obj = [root](const char* name){return yyjson_obj_get(root, name);}; auto assign = [&obj](const char* name, char** d, size_t* l){ auto v = obj(name); *l = yyjson_get_len(v); *d = ptrT(*l+1); memcpy(*d, yyjson_get_str(v), *l); }; msg->table = (TableChanged)yyjson_get_int(obj("table")); msg->action = (DbAction)yyjson_get_int(obj("action")); assign("id", &msg->id.str, &msg->id.size); assign("info", &msg->info.str, &msg->info.size); yyjson_doc_free(doc); return msg; } static inline void freeDbChangeMsg(struct DbChangeMsg* msg){ if (msg){ cstr_free(msg->id); cstr_free(msg->info); free(msg); } } /////////////////////////////////////////////////////////////// static inline tuple parseTopic(MsgPublish &msgp, const char* data, const size_t size){ if (!msgp.ParseFromArray(data, size)) return make_tuple(nullptr, 0); auto tpc = ptrT(msgp.topic().length()+1); memcpy(tpc, msgp.topic().data(), msgp.topic().size()); return make_tuple(tpc, msgp.topic().size()); } static inline void json2str(yyjson_val* v, char** d, size_t* l){ *l = yyjson_get_len(v); *d = ptrT(*l+1); memcpy(*d, yyjson_get_str(v), *l); } static struct creg* jsonval2creg(yyjson_val* v){ auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);}; auto rinfo = ptrT(); auto pval = obj(v, "proc"); auto pinfo = ptrT(); json2str(obj(pval, "name"), &pinfo->name.str, &pinfo->name.size); json2str(obj(pval, "id"), &pinfo->id.str, &pinfo->id.size); json2str(obj(pval, "info"), &pinfo->info.str, &pinfo->info.size); rinfo->pinfo = pinfo; auto assign_arr = [&obj](yyjson_val* v){ const size_t count = yyjson_arr_size(v); struct cstrarr arr = cstr_arr_new(count); for(size_t i = 0; i < count; i++){ auto sv = yyjson_arr_get(v, i); char* entry = NULL; size_t entry_size = 0; json2str(sv, &entry, &entry_size); arr.arr[i] = cstr_new(entry, entry_size); } return arr; }; rinfo->channel = assign_arr(obj(v, "channel")); rinfo->topic_pub = assign_arr(obj(v, "pubTopic")); rinfo->topic_sub = assign_arr(obj(v, "subTopic")); rinfo->topic_sub_net = assign_arr(obj(v, "subNetTopic")); return rinfo; } static inline void free_cclients(struct cclient* cli, const size_t size){ for(size_t i = 0; i < size; i++) if (cli[i].rinfo) free_creg(cli[i].rinfo); free(cli); } static struct cclient* json2cclients(const char* data, const size_t size, size_t* len){ yyjson_doc* doc = yyjson_read(data, size, 0); yyjson_val* root = yyjson_doc_get_root(doc); *len = yyjson_arr_size(root); auto cli = ptrT(*len); auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);}; for(size_t i = 0; i < *len; i++){ cclient rc; yyjson_val* one = yyjson_arr_get(root, i); rc.replykey = yyjson_get_int(obj(one, "replyKey")); rc.hbcnt = yyjson_get_int(obj(one, "heartbeatCount")); rc.dcnt = yyjson_get_int(obj(one, "deadCount")); rc.status = yyjson_get_int(obj(one, "status")); rc.rinfo = jsonval2creg(obj(one, "info")); cli[i] = rc; } yyjson_doc_free(doc); return cli; } //////////////////////////////////////////////////////////////// static struct DbChangeMsg* pb2DbChangeMsg(const char* data, const size_t len){ string pbstr = R"( syntax = "proto3"; package protomsg; //TableChanged enum enum TableChanged { T_Camera = 0;//摄像机变化 T_CameraRule = 1;//摄像机任务参数变化 T_Sdk = 2;//sdk变化 T_CameraPolygon = 3;//摄像机多边形变化 T_TimeRule = 4;//时间规则变化 T_Server = 5;//服务器信息变化 T_PollConfig = 6;//轮询配置变化 T_File = 7;//本地文件变化 T_FileSetting = 8;//本地文件分析设置 T_SdkChanSet = 9;//算法通道设置变化 T_FileStack = 10;//数据栈变化 T_ResourceConfig = 11;//对外服务配置变化 T_CalculationPower = 12;//需要重新计算算力占用情况 T_EventPush = 13; //事件推送有变化 T_Cluster = 14; //集群创建、加入或退出 T_CameraPolygonRelation = 15; //摄像机区域的关联关系 T_Voice = 16;//报警声音发生变化 } enum DbAction { Insert = 0;//Insert Update = 1;//Update Delete = 2;//Delete } //publish db change message message DbChangeMessage { TableChanged table = 1;//变化的表 string id = 2;//变化数据id DbAction action = 3;//action string info = 4;//变化内容 } )"; string filename = to_string(random()) + "_" + to_string(random()) + "db.proto"; FILE* fp = fopen(filename.c_str(), "wb"); size_t pos = 0; while(pos != pbstr.size()){ pos += fwrite(pbstr.data()+pos, 1, pbstr.size()-pos, fp); } fclose(fp); using namespace google::protobuf; compiler::DiskSourceTree ds; ds.MapPath("", "."); compiler::Importer imp(&ds, NULL); imp.Import(filename); remove(filename.c_str()); auto desc = imp.pool()->FindMessageTypeByName("protomsg.DbChangeMessage"); DynamicMessageFactory factory; auto message = factory.GetPrototype(desc); auto msg = message->New(); msg->ParseFromArray(data, len); // printf("debug string %s\n", msg->DebugString().c_str()); auto db = ptrT(); auto reflection = msg->GetReflection(); db->table = (TableChanged)reflection->GetEnumValue(*msg, desc->FindFieldByName("table")); db->action = (DbAction)reflection->GetEnumValue(*msg, desc->FindFieldByName("action")); auto assign = [reflection,desc,msg](const char* name, char** d, size_t* l){ *l = 0; *d = NULL; auto s = reflection->GetString(*msg, desc->FindFieldByName(name)); if (s.empty()) return; *l = s.size(); *d = ptrT(s.size()+1); memcpy(*d, s.data(), *l); }; assign("id", &db->id.str, &db->id.size); assign("info", &db->info.str, &db->info.size); // printf("table %d id %s act %d info %s\n", tbl, id.c_str(), act, info.c_str()); // string json; // util::MessageToJsonString(*msg, &json); // printf("json string %s\n", json.c_str()); delete msg; return db; } struct csubmsg* to_submsg(const char* data, const size_t size){ MsgPublish msg; auto tp = parseTopic(msg, data, size); auto pmsg = ptrT(); pmsg->topic = cstr_new(get<0>(tp), get<1>(tp)); pmsg->msg = cstr_new(msg.data().data(), msg.data().size()); return pmsg; } void free_submsg(struct csubmsg* msg){ if (msg){ cstr_free(msg->topic); cstr_free(msg->msg); free(msg); } } struct DbChangeMsg* get_submsg_db(struct csubmsg* msg){ // string json = toJSON((const char*)msg->data, msg->size, "DbChangeMessage"); // return json2DbChangeMsg(json); return pb2DbChangeMsg((const char*)msg->msg.str, msg->msg.size); } void free_submsg_db(struct DbChangeMsg* msg){ if (msg) freeDbChangeMsg(msg); } struct cproclist* get_submsg_proclist(struct csubmsg* msg){ const char* data = msg->msg.str; const size_t size = msg->msg.size; auto pl = ptrT(); pl->cli = json2cclients(data, size, &pl->count); return pl; } void free_submsg_proclist(struct cproclist* pl){ if (pl) if (pl->cli) free_cclients(pl->cli, pl->count); } ////////////////////////////////////////////////// static tuple json2reqmsg(const struct cstr msg){ yyjson_doc* doc = yyjson_read(msg.str, msg.size, 0); yyjson_val* root = yyjson_doc_get_root(doc); auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);}; auto jp = obj(root, "path"); auto jb = obj(root, "body"); auto tp = make_tuple(cstr_new(yyjson_get_str(jp), yyjson_get_len(jp)), cstr_new(yyjson_get_raw(jb), yyjson_get_len(jb))); yyjson_doc_free(doc); return tp; } struct creqmsg* to_reqmsg(const char* pid, const size_t pids, const char* data, const size_t size) { auto msg = ptrT(); msg->procid = cstr_new(pid, pids); MsgRequestTopic msgRT; if (!msgRT.ParseFromArray(data, size)) return NULL; msg->msg = cstr_new(msgRT.data().data(), msgRT.data().size()); return msg; } struct creqmsg* make_req_msg(const char* topic, const size_t topicl, const char* data, const size_t datal) { auto msg = ptrT(); MsgRequestTopic msgRT; msgRT.set_topic(topic, topicl); msgRT.set_data(data, datal); auto pbstr = msgRT.SerializeAsString(); msg->msg = cstr_new(pbstr.data(), pbstr.size()); return msg; } void free_reqmsg(struct creqmsg* msg){ if (msg){ cstr_free(msg->procid); cstr_free(msg->msg); free(msg); } } struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg){ struct cstr path = null_cstr(); struct cstr body = null_cstr(); tie(path, body) = json2reqmsg(msg->msg); if (body.size == 0) return NULL; auto doc = yyjson_read(body.str, body.size, 0); auto root = yyjson_doc_get_root(doc); auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);}; auto jsid = obj(root, "stackId"); auto jfid = obj(root, "fileId"); auto stkmsg = ptrT(); stkmsg->stackid = cstr_new(yyjson_get_str(jsid), yyjson_get_len(jsid)); stkmsg->fileid = cstr_new(yyjson_get_str(jfid), yyjson_get_len(jfid)); yyjson_doc_free(doc); cstr_free(path); cstr_free(body); return stkmsg; } void free_reqmsg_stackerr(struct cstackmsgerr* msg){ if (msg){ cstr_free(msg->stackid); cstr_free(msg->fileid); free(msg); } } // decode success msg struct cstackmsg* get_reqmsg_stack(struct creqmsg* msg){ struct cstr path = null_cstr(); struct cstr body = null_cstr(); tie(path, body) = json2reqmsg(msg->msg); if (body.size == 0) return NULL; auto doc = yyjson_read(body.str, body.size, 0); auto root = yyjson_doc_get_root(doc); auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);}; auto assign = [&obj](yyjson_val* v, const char* name){ auto tmp(obj(v, name)); return cstr_new(yyjson_get_str(tmp), yyjson_get_len(tmp)); }; auto smsg = ptrT(); smsg->procnum = yyjson_get_int(obj(root, "procNum")); smsg->stackid = assign(root, "stackId"); smsg->stackname = assign(root, "stackName"); smsg->type = yyjson_get_int(obj(root, "type")); smsg->width = yyjson_get_int(obj(root, "resolution_width")); smsg->width = yyjson_get_int(obj(root, "resolution_height")); auto arr = obj(root, "fileArr"); smsg->filescnt = yyjson_arr_size(arr); smsg->files = ptrT(smsg->filescnt); for(size_t i = 0; i < smsg->filescnt; i++){ struct cstackfile file; memset(&file, 0, sizeof(file)); yyjson_val* one = yyjson_arr_get(arr, i); file.id = assign(one, "id"); file.name = assign(one, "name"); file.path = assign(one, "path"); file.type = yyjson_get_int(obj(one, "type")); smsg->files[i] = file; } yyjson_doc_free(doc); return smsg; } void free_reqmsg_stack(struct cstackmsg* msg){ if (msg){ cstr_free(msg->stackid); cstr_free(msg->stackname); for(size_t i = 0; i < msg->filescnt; i++){ cstr_free(msg->files[i].id); cstr_free(msg->files[i].name); cstr_free(msg->files[i].path); } free(msg); } } struct crepmsg* make_reply_msg(const int errcode, const char* errmsg, const size_t emsgl, const char* data, const size_t datal) { auto msg = ptrT(); msg->errcode = errcode; msg->errmsg = cstr_new(errmsg, emsgl); msg->data = cstr_new(data, datal); return msg; } struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl, const char* data, const size_t datal) { auto doc = yyjson_mut_doc_new(NULL); auto root = yyjson_mut_obj(doc); yyjson_mut_obj_add_bool(doc, root, "success", !!success); yyjson_mut_obj_add_strn(doc, root, "msg", msg, msgl); yyjson_mut_obj_add_strn(doc, root, "data", data, datal); size_t jsonl = 0; char* json = yyjson_mut_val_write(root, 0, &jsonl); yyjson_mut_doc_free(doc); return cstr_ref(json, jsonl); } void free_reply_msg(struct crepmsg* msg){ if (msg){ cstr_free(msg->errmsg); cstr_free(msg->data); free(msg); } }