| | |
| | | template <class F>
|
| | | MsgCR to_topic(client* cli, F&& f, const struct cstrarr topic){
|
| | | MsgCR msg(dummy());
|
| | | if (topic.arr && topic.size){
|
| | | if (topic.arr && topic.count){
|
| | | MsgTopicList tlist;
|
| | | for(size_t i = 0; i < topic.size; i++)
|
| | | for(size_t i = 0; i < topic.count; i++)
|
| | | tlist.add_topic_list(topic.arr[i].str, topic.arr[i].size);
|
| | | const auto& tpc = tlist.SerializeAsString();
|
| | | void* replymsg = NULL;
|
| | |
| | | cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); }));
|
| | | }
|
| | |
|
| | | // request/reply和pub topic一起处理
|
| | | auto tmparr = cstr_arr_new(rinfo->channel.count + rinfo->topic_pub.count);
|
| | | auto addarr = [&tmparr](size_t& start, const struct cstrarr* arr){
|
| | | for(size_t i = 0; i < arr->count; i++){
|
| | | cstr_arr_add(&tmparr, arr->arr[i].str, arr->arr[i].size, start+i);
|
| | | }
|
| | | start += arr->count;
|
| | | };
|
| | | size_t s = 0;
|
| | | addarr(s, &rinfo->channel);
|
| | | addarr(s, &rinfo->topic_pub);
|
| | | auto tpcmsg = to_topic(cli, bus_register_topics, tmparr);
|
| | | cstr_arr_free(tmparr);
|
| | | // auto channelmsg = to_topic(cli, bus_register_topics, rinfo->channel);
|
| | | // auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub);
|
| | | // if topic pub/sub[net] exist, register topics
|
| | | auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub);
|
| | | auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub);
|
| | | auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net);
|
| | |
|
| | |
| | | return pmsg;
|
| | | }
|
| | |
|
| | | struct creqmsg* bus_client_get_reqmsg(void* handle){
|
| | | struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
|
| | | client* cli = ptr(handle);
|
| | | Msg msg = std::move(cli->readreq_q->pop());
|
| | | if (msg.empty()) return NULL;
|
| | |
|
| | | void* procid = NULL, *data = NULL, *src = NULL;
|
| | | void* procid = NULL, *data = NULL;
|
| | | int pids = 0, size = 0;
|
| | |
|
| | | tie(procid, pids) = msg.at(0);
|
| | | tie(data, size) = msg.at(1);
|
| | | tie(src, ignore) = msg.at(2);
|
| | | tie(*src, ignore) = msg.at(2);
|
| | |
|
| | | auto pmsg = to_reqmsg((const char*)procid, pids, (const char *)data, size, src);
|
| | | auto pmsg = to_reqmsg((const char*)procid, pids, (const char *)data, size);
|
| | | bus_free(procid, pids);
|
| | | bus_free(data, size);
|
| | |
|
| | | return pmsg;
|
| | | }
|
| | |
|
| | | int bus_client_reply_reqmsg(void* handle, struct crepmsg* msg){
|
| | | int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){
|
| | |
|
| | | void* procid = NULL, *reply = NULL;
|
| | | int pids = 0, replys = 0;
|
| | | auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
|
| | | msg->msg.str, msg->msg.size, &procid, &pids,
|
| | | &reply, &replys, sndto));
|
| | | if (!vmsg.empty()){
|
| | | void* procid = NULL, *data = NULL;
|
| | | int pids = 0, size = 0;
|
| | | tie(procid, pids) = vmsg.at(0);
|
| | | bus_free(procid, pids);
|
| | | tie(data, size) = vmsg.at(1);
|
| | | MsgRequestTopicReply msgRT;
|
| | | auto pb = msgRT.ParseFromArray(reply, replys);
|
| | | bus_free(reply, replys);
|
| | | if (!pb) return false;
|
| | |
|
| | | *repmsg = make_reply_msg(msgRT.errmsg().errcode(),
|
| | | msgRT.errmsg().errstring().data(), msgRT.errmsg().errstring().size(),
|
| | | msgRT.data().data(), msgRT.data().size());
|
| | | return true;
|
| | | }
|
| | | return false;
|
| | | }
|
| | |
|
| | | int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg){
|
| | |
|
| | | MsgRequestTopicReply msgR;
|
| | | auto err = msgR.mutable_errmsg();
|
| | | err->set_errcode((ErrorCode)msg->errcode);
|
| | | err->set_errstring(msg->errmsg.str, msg->errmsg.size);
|
| | |
|
| | | msgR.set_data(msg->data.str, msg->data.size);
|
| | | auto pbstr = msgR.SerializeAsString();
|
| | |
|
| | | auto cli = ptr(handle);
|
| | | return bus_send_reply(cli->bus, src, pbstr.data(), pbstr.size());
|
| | | }
|
| | |
|
| | |
|
| | |
| | |
|
| | | struct csubmsg* bus_client_get_submsg(void* handle);
|
| | |
|
| | | struct creqmsg* bus_client_get_reqmsg(void* handle);
|
| | | int bus_client_reply_reqmsg(void* handle, struct crepmsg* msg);
|
| | | struct creqmsg* bus_client_get_reqmsg(void* handle, void** src);
|
| | | int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg);
|
| | | int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg);
|
| | |
|
| | | // test
|
| | | int bus_client_pubmsg(void* handle, void* data, const size_t size);
|
| | |
| | | return pinfo; |
| | | } |
| | | |
| | | static unique_ptr<thread> pub(const vector<string>& topics){ |
| | | template <class F> void ignoref(F&& f){} |
| | | |
| | | auto ptr = unique_ptr<thread>(new thread([&]{ |
| | | creg reg; |
| | | memset(®, 0, sizeof(reg)); |
| | | reg.pinfo = make_proc("pub", "pubid"); |
| | | reg.topic_pub = cstr_arr_new(topics.size()); |
| | | size_t i = 0; |
| | | for(; i < topics.size(); i++){ |
| | | cstr_arr_add(®.topic_pub, topics.at(0).data(), topics.at(0).size(), i); |
| | | static void pub(const vector<string>& topics){ |
| | | ignoref(pub); |
| | | |
| | | creg reg; |
| | | memset(®, 0, sizeof(reg)); |
| | | reg.pinfo = make_proc("pub", "pubid"); |
| | | reg.topic_pub = cstr_arr_new(topics.size()); |
| | | size_t i = 0; |
| | | for(; i < topics.size(); i++){ |
| | | cstr_arr_add(®.topic_pub, topics.at(0).data(), topics.at(0).size(), i); |
| | | } |
| | | void* handle = bus_client_init(NULL, 0, ®); |
| | | size_t count = 0; |
| | | string base_msg("test_pub_sub=="); |
| | | this_thread::sleep_for(chrono::seconds(3)); |
| | | while (true) { |
| | | for(auto && i : topics){ |
| | | auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++); |
| | | MsgPublish pbmsg; |
| | | pbmsg.set_topic(i); |
| | | pbmsg.set_data(msg); |
| | | auto data = pbmsg.SerializeAsString(); |
| | | // TestPub(i.c_str(), i.length(), data.c_str(), data.length()); |
| | | int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size()); |
| | | printf("======>> bus_client_pubmsg [%s]\n", msg.c_str()); |
| | | this_thread::sleep_for(chrono::seconds{2}); |
| | | } |
| | | void* handle = bus_client_init(NULL, 0, ®); |
| | | |
| | | size_t count = 0; |
| | | string base_msg("test_pub_sub=="); |
| | | this_thread::sleep_for(chrono::seconds(3)); |
| | | while (true) { |
| | | for(auto && i : topics){ |
| | | auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++); |
| | | MsgPublish pbmsg; |
| | | pbmsg.set_topic(i); |
| | | pbmsg.set_data(msg); |
| | | auto data = pbmsg.SerializeAsString(); |
| | | // TestPub(i.c_str(), i.length(), data.c_str(), data.length()); |
| | | int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size()); |
| | | printf("======>> bus_client_pubmsg [%s]\n", msg.c_str()); |
| | | |
| | | this_thread::sleep_for(chrono::seconds{2}); |
| | | } |
| | | } |
| | | })); |
| | | ptr->detach(); |
| | | return ptr; |
| | | } |
| | | } |
| | | |
| | | int main(int argc, char const *argv[]) |
| | | { |
| | | vector<string> topics{ |
| | | "cbhomeclient_test_pubsub" |
| | | }; |
| | | auto p = pub(topics); |
| | | // while (true) this_thread::sleep_for(chrono::seconds{1}); |
| | | static void sub(const vector<string>& topics){ |
| | | ignoref(sub); |
| | | |
| | | creg reg; |
| | | memset(®, 0, sizeof(reg)); |
| | |
| | | |
| | | while (true) { |
| | | auto msg = bus_client_get_submsg(handle); |
| | | printf("msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str); |
| | | printf("SUB msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str); |
| | | } |
| | | |
| | | bus_client_free(handle); |
| | | } |
| | | |
| | | static void req(const char* topic){ |
| | | ignoref(req); |
| | | |
| | | string strtpc(topic); |
| | | creg reg; |
| | | memset(®, 0, sizeof(reg)); |
| | | reg.pinfo = make_proc("request", "requestid"); |
| | | // reg.channel = cstr_arr_new(1); |
| | | // size_t i = 0; |
| | | // for(; i < 1; i++){ |
| | | // cstr_arr_add(®.topic_pub, topic, strlen(topic), i); |
| | | // } |
| | | void* handle = bus_client_init(NULL, 0, ®); |
| | | size_t count = 0; |
| | | string base_msg("test_request=="); |
| | | this_thread::sleep_for(chrono::seconds(3)); |
| | | while (true) { |
| | | auto msg = base_msg + "request message -> msg-"+to_string(count++); |
| | | auto reqmsg = make_req_msg(strtpc.data(), strtpc.size(), msg.data(), msg.size()); |
| | | crepmsg* repmsg = NULL; |
| | | if (bus_client_request(handle, reqmsg, &repmsg)){ |
| | | printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data.str); |
| | | } |
| | | free_reqmsg(reqmsg); |
| | | free_reply_msg(repmsg); |
| | | this_thread::sleep_for(chrono::seconds{2}); |
| | | } |
| | | } |
| | | |
| | | static void reply(const char* topic){ |
| | | ignoref(reply); |
| | | |
| | | creg reg; |
| | | memset(®, 0, sizeof(reg)); |
| | | reg.pinfo = make_proc("reply", "replyid"); |
| | | reg.channel = cstr_arr_new(1); |
| | | size_t i = 0; |
| | | for(; i < 1; i++){ |
| | | cstr_arr_add(®.channel, topic, strlen(topic), i); |
| | | } |
| | | void* handle = bus_client_init(NULL, 0, ®); |
| | | size_t count = 0; |
| | | this_thread::sleep_for(chrono::seconds(3)); |
| | | while (true) { |
| | | void* src = NULL; |
| | | auto msg = bus_client_get_reqmsg(handle, &src); |
| | | auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12); |
| | | bus_client_reply_msg(handle, src, repmsg); |
| | | free_reply_msg(repmsg); |
| | | printf("REPREQ msg [%s] \n", msg->msg.str); |
| | | |
| | | free_reqmsg(msg); |
| | | this_thread::sleep_for(chrono::seconds{2}); |
| | | } |
| | | } |
| | | |
| | | int main(int argc, char const *argv[]) |
| | | { |
| | | vector<string> topics{ |
| | | "cbhomeclient_test_pubsub" |
| | | }; |
| | | thread([&]{ pub(topics); }).detach(); |
| | | thread([&]{ sub(topics); }).detach(); |
| | | // sub(topics); |
| | | |
| | | printf("start RR\n"); |
| | | const char* rrtopic = "cbhomeclient_req_rep"; |
| | | thread([&]{ req(rrtopic); }).detach(); |
| | | reply(rrtopic); |
| | | |
| | | return 0; |
| | | } |
| | |
| | | static struct cstr cstr_clone(const struct cstr old){ |
| | | return cstr_new(old.str, old.size); |
| | | } |
| | | |
| | | static struct cstr cstr_ref(char* str, const size_t len){ |
| | | struct cstr cs; |
| | | memset(&cs, 0, sizeof(cs)); |
| | | cs.size = len; |
| | | cs.str = str; |
| | | return cs; |
| | | } |
| | | struct cstr cstr_new(const char* str, const size_t len){ |
| | | struct cstr cs; |
| | | memset(&cs, 0, sizeof(cs)); |
| | |
| | | void cstr_free(struct cstr str){ |
| | | if (str.str && str.size) free(str.str); |
| | | } |
| | | struct cstrarr cstr_arr_new(const size_t len){ |
| | | struct cstrarr cstr_arr_new(const size_t count){ |
| | | struct cstrarr arr; |
| | | memset(&arr, 0, sizeof(arr)); |
| | | arr.arr = (struct cstr*)calloc(len, sizeof(struct cstr)); |
| | | arr.size = len; |
| | | arr.arr = (struct cstr*)calloc(count, sizeof(struct cstr)); |
| | | arr.count = count; |
| | | return arr; |
| | | } |
| | | void cstr_arr_add(struct cstrarr* arr, const char* data, const size_t len, const size_t idx){ |
| | | if (arr->arr && arr->size && idx < arr->size){ |
| | | if (arr->arr && arr->count && idx < arr->count){ |
| | | arr->arr[idx] = cstr_new(data, len); |
| | | } |
| | | } |
| | | void cstr_arr_free(struct cstrarr arr){ |
| | | for(size_t i = 0; i < arr.size; i++){ |
| | | for(size_t i = 0; i < arr.count; i++){ |
| | | auto & str = arr.arr[i]; |
| | | cstr_free(str); |
| | | } |
| | |
| | | rinfo->pinfo = pinfo; |
| | | |
| | | auto assign_arr = [&obj](yyjson_val* v){ |
| | | const size_t size = yyjson_arr_size(v); |
| | | const size_t count = yyjson_arr_size(v); |
| | | struct cstrarr arr; |
| | | memset(&arr, 0, sizeof(arr)); |
| | | arr.size = size; |
| | | for(size_t i = 0; i < size; i++){ |
| | | arr.count = count; |
| | | for(size_t i = 0; i < count; i++){ |
| | | auto sv = yyjson_arr_get(v, i); |
| | | char* entry = NULL; |
| | | size_t entry_size = 0; |
| | |
| | | } |
| | | |
| | | ////////////////////////////////////////////////// |
| | | struct creq json2creq(const char* data, const size_t size){ |
| | | yyjson_doc* doc = yyjson_read(data, size, 0); |
| | | static tuple<struct cstr, struct cstr> json2reqmsg(const struct cstr msg){ |
| | | yyjson_doc* doc = yyjson_read(msg.str, msg.size, 0); |
| | | yyjson_val* root = yyjson_doc_get_root(doc); |
| | | |
| | | auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);}; |
| | | |
| | | struct creq req; |
| | | memset(&req, 0, sizeof(req)); |
| | | |
| | | auto jp = obj(root, "path"); |
| | | auto jb = obj(root, "body"); |
| | | req.path = cstr_new(yyjson_get_str(jp), yyjson_get_len(jp)); |
| | | req.body = cstr_new(yyjson_get_raw(jb), yyjson_get_len(jb)); |
| | | |
| | | auto tp = make_tuple(cstr_new(yyjson_get_str(jp), yyjson_get_len(jp)), |
| | | cstr_new(yyjson_get_raw(jb), yyjson_get_len(jb))); |
| | | yyjson_doc_free(doc); |
| | | return req; |
| | | return tp; |
| | | } |
| | | |
| | | struct creqmsg* to_reqmsg(const char* pid, const size_t pids, |
| | | const char* data, const size_t size, void* src) |
| | | struct creqmsg* to_reqmsg(const char* pid, const size_t pids, const char* data, const size_t size) |
| | | { |
| | | auto msg = ptrT<struct creqmsg>(); |
| | | msg->procid = cstr_new(pid, pids); |
| | | MsgRequestTopic msgRT; |
| | | if (!msgRT.ParseFromArray(data, size)) return NULL; |
| | | msg->msg = cstr_new(msgRT.data().data(), msgRT.data().size()); |
| | | |
| | | // creg = msgRT.data(); |
| | | msg->msg = json2creq(data, size); |
| | | msg->src = src; |
| | | return msg; |
| | | } |
| | | |
| | | struct creqmsg* make_req_msg(const char* topic, const size_t topics, |
| | | const char* data, const size_t datal) |
| | | { |
| | | auto msg = ptrT<struct creqmsg>(); |
| | | MsgRequestTopic msgRT; |
| | | msgRT.set_topic(topic, topics); |
| | | msgRT.set_data(data, datal); |
| | | |
| | | auto pbstr = msgRT.SerializeAsString(); |
| | | msg->msg = cstr_new(pbstr.data(), pbstr.size()); |
| | | |
| | | return msg; |
| | | } |
| | | |
| | | void free_reqmsg(struct creqmsg* msg){ |
| | | if (msg){ |
| | | cstr_free(msg->procid); |
| | | cstr_free(msg->msg.path); |
| | | cstr_free(msg->msg.body); |
| | | cstr_free(msg->msg); |
| | | free(msg); |
| | | } |
| | | } |
| | | |
| | | struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg){ |
| | | auto &body = msg->msg.body; |
| | | struct cstr path, body; |
| | | memset(&path, 0, sizeof(path)); |
| | | memset(&body, 0, sizeof(body)); |
| | | tie(path, body) = json2reqmsg(msg->msg); |
| | | if (body.size == 0) return NULL; |
| | | |
| | | auto doc = yyjson_read(body.str, body.size, 0); |
| | | auto root = yyjson_doc_get_root(doc); |
| | | |
| | |
| | | stkmsg->stackid = cstr_new(yyjson_get_str(jsid), yyjson_get_len(jsid)); |
| | | stkmsg->fileid = cstr_new(yyjson_get_str(jfid), yyjson_get_len(jfid)); |
| | | yyjson_doc_free(doc); |
| | | |
| | | cstr_free(path); |
| | | cstr_free(body); |
| | | |
| | | return stkmsg; |
| | | } |
| | | void free_reqmsg_stackerr(struct cstackmsgerr* msg){ |
| | |
| | | } |
| | | // decode success msg |
| | | struct cstackmsg* get_reqmsg_stack(struct creqmsg* msg){ |
| | | auto &body = msg->msg.body; |
| | | struct cstr path, body; |
| | | memset(&path, 0, sizeof(path)); |
| | | memset(&body, 0, sizeof(body)); |
| | | tie(path, body) = json2reqmsg(msg->msg); |
| | | if (body.size == 0) return NULL; |
| | | |
| | | auto doc = yyjson_read(body.str, body.size, 0); |
| | | auto root = yyjson_doc_get_root(doc); |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | struct crepmsg* make_reply_msg(const int success, const char* msg, const size_t msgl, |
| | | struct crepmsg* make_reply_msg(const int errcode, const char* errmsg, const size_t emsgl, |
| | | const char* data, const size_t datal) |
| | | { |
| | | auto rmsg = ptrT<struct crepmsg>(); |
| | | // rmsg->success = success; |
| | | // rmsg->msg = cstr_new(msg, msgl); |
| | | // rmsg->data = cstr_new(msg, msgl); |
| | | auto msg = ptrT<struct crepmsg>(); |
| | | msg->errcode = errcode; |
| | | msg->errmsg = cstr_new(errmsg, emsgl); |
| | | msg->data = cstr_new(data, datal); |
| | | return msg; |
| | | } |
| | | |
| | | struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl, |
| | | const char* data, const size_t datal) |
| | | { |
| | | auto doc = yyjson_mut_doc_new(NULL); |
| | | auto root = yyjson_mut_obj(doc); |
| | | yyjson_mut_obj_add_bool(doc, root, "success", !!success); |
| | | yyjson_mut_obj_add_strn(doc, root, "msg", msg, msgl); |
| | | yyjson_mut_obj_add_strn(doc, root, "data", data, datal); |
| | | |
| | | return rmsg; |
| | | size_t jsonl = 0; |
| | | char* json = yyjson_mut_val_write(root, 0, &jsonl); |
| | | yyjson_mut_doc_free(doc); |
| | | return cstr_ref(json, jsonl); |
| | | } |
| | | |
| | | void free_reply_msg(struct crepmsg* msg){ |
| | | if (msg){ |
| | | cstr_free(msg->msg); |
| | | cstr_free(msg->errmsg); |
| | | cstr_free(msg->data); |
| | | free(msg); |
| | | } |
| | |
| | | |
| | | struct cstrarr{ |
| | | struct cstr* arr; |
| | | size_t size; |
| | | size_t count; |
| | | }; |
| | | // 进程注册信息 |
| | | struct creg{ |
| | |
| | | }; |
| | | |
| | | ////////////////////////////////////////// |
| | | // request body |
| | | struct creq{ |
| | | struct cstr path; |
| | | struct cstr body; |
| | | }; |
| | | // request msg |
| | | struct creqmsg{ |
| | | struct cstr procid; |
| | | struct creq msg; |
| | | void* src; |
| | | struct cstr msg; |
| | | }; |
| | | // decode stack err msg |
| | | struct cstackmsgerr{ |
| | |
| | | |
| | | // reply msg |
| | | struct crepmsg{ |
| | | struct cstr json; |
| | | |
| | | int success; |
| | | struct cstr msg; |
| | | int errcode; |
| | | struct cstr errmsg; |
| | | struct cstr data; |
| | | }; |
| | | |
| | |
| | | |
| | | struct cstr cstr_new(const char* str, const size_t len); |
| | | void cstr_free(struct cstr str); |
| | | struct cstrarr cstr_arr_new(const size_t len); |
| | | struct cstrarr cstr_arr_new(const size_t count); |
| | | void cstr_arr_add(struct cstrarr* arr, const char* data, const size_t len, const size_t idx); |
| | | void cstr_arr_free(struct cstrarr arr); |
| | | |
| | |
| | | void free_submsg_proclist(struct cproclist* ppl); |
| | | |
| | | // request msg |
| | | struct creqmsg* to_reqmsg(const char* pid, const size_t pids, |
| | | const char* data, const size_t size, void* src); |
| | | struct creqmsg* to_reqmsg(const char* pid,const size_t pids,const char* data,const size_t size); |
| | | struct creqmsg* make_req_msg(const char* topic, const size_t topics, |
| | | const char* data, const size_t datal); |
| | | |
| | | void free_reqmsg(struct creqmsg* msg); |
| | | // decode err msg |
| | | struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg); |
| | |
| | | void free_reqmsg_stack(struct cstackmsg* msg); |
| | | |
| | | // reply msg |
| | | struct crepmsg* make_reply_msg(const int success, const char* msg, const size_t msgl, |
| | | struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl, |
| | | const char* data, const size_t datal); |
| | | struct crepmsg* make_reply_msg(const int errcode, const char* errmsg, const size_t emsgl, |
| | | const char* data, const size_t datal); |
| | | |
| | | void free_reply_msg(struct crepmsg* msg); |
| | | |
| | | #ifdef __cplusplus |