From ecf23f882ca1b8aaf0863980fc4781c515da1695 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 12 十二月 2022 16:49:03 +0800 Subject: [PATCH] add req rep --- 3dparty/bus_nng/x86_64/libbus_nng.so | 0 main.cpp | 142 +++++++++++++++++------ message.cpp | 106 ++++++++++++----- message.h | 29 ++-- cbhomeclient.cpp | 64 +++++++++- cbhomeclient.h | 5 6 files changed, 247 insertions(+), 99 deletions(-) diff --git a/3dparty/bus_nng/x86_64/libbus_nng.so b/3dparty/bus_nng/x86_64/libbus_nng.so index d52d466..d01dcbd 100755 --- a/3dparty/bus_nng/x86_64/libbus_nng.so +++ b/3dparty/bus_nng/x86_64/libbus_nng.so Binary files differ diff --git a/cbhomeclient.cpp b/cbhomeclient.cpp index 1d213bb..9c8701e 100644 --- a/cbhomeclient.cpp +++ b/cbhomeclient.cpp @@ -121,9 +121,9 @@ template <class F> MsgCR to_topic(client* cli, F&& f, const struct cstrarr topic){ MsgCR msg(dummy()); - if (topic.arr && topic.size){ + if (topic.arr && topic.count){ MsgTopicList tlist; - for(size_t i = 0; i < topic.size; i++) + for(size_t i = 0; i < topic.count; i++) tlist.add_topic_list(topic.arr[i].str, topic.arr[i].size); const auto& tpc = tlist.SerializeAsString(); void* replymsg = NULL; @@ -189,8 +189,22 @@ cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); })); } + // request/reply鍜宲ub topic涓�璧峰鐞� + auto tmparr = cstr_arr_new(rinfo->channel.count + rinfo->topic_pub.count); + auto addarr = [&tmparr](size_t& start, const struct cstrarr* arr){ + for(size_t i = 0; i < arr->count; i++){ + cstr_arr_add(&tmparr, arr->arr[i].str, arr->arr[i].size, start+i); + } + start += arr->count; + }; + size_t s = 0; + addarr(s, &rinfo->channel); + addarr(s, &rinfo->topic_pub); + auto tpcmsg = to_topic(cli, bus_register_topics, tmparr); + cstr_arr_free(tmparr); + // auto channelmsg = to_topic(cli, bus_register_topics, rinfo->channel); + // auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub); // if topic pub/sub[net] exist, register topics - auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub); auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub); auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net); @@ -251,29 +265,63 @@ return pmsg; } -struct creqmsg* bus_client_get_reqmsg(void* handle){ +struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){ client* cli = ptr(handle); Msg msg = std::move(cli->readreq_q->pop()); if (msg.empty()) return NULL; - void* procid = NULL, *data = NULL, *src = NULL; + void* procid = NULL, *data = NULL; int pids = 0, size = 0; tie(procid, pids) = msg.at(0); tie(data, size) = msg.at(1); - tie(src, ignore) = msg.at(2); + tie(*src, ignore) = msg.at(2); - auto pmsg = to_reqmsg((const char*)procid, pids, (const char *)data, size, src); + auto pmsg = to_reqmsg((const char*)procid, pids, (const char *)data, size); bus_free(procid, pids); bus_free(data, size); return pmsg; } -int bus_client_reply_reqmsg(void* handle, struct crepmsg* msg){ +int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){ + void* procid = NULL, *reply = NULL; + int pids = 0, replys = 0; + auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0, + msg->msg.str, msg->msg.size, &procid, &pids, + &reply, &replys, sndto)); + if (!vmsg.empty()){ + void* procid = NULL, *data = NULL; + int pids = 0, size = 0; + tie(procid, pids) = vmsg.at(0); + bus_free(procid, pids); + tie(data, size) = vmsg.at(1); + MsgRequestTopicReply msgRT; + auto pb = msgRT.ParseFromArray(reply, replys); + bus_free(reply, replys); + if (!pb) return false; + + *repmsg = make_reply_msg(msgRT.errmsg().errcode(), + msgRT.errmsg().errstring().data(), msgRT.errmsg().errstring().size(), + msgRT.data().data(), msgRT.data().size()); + return true; + } + return false; +} + +int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg){ MsgRequestTopicReply msgR; + auto err = msgR.mutable_errmsg(); + err->set_errcode((ErrorCode)msg->errcode); + err->set_errstring(msg->errmsg.str, msg->errmsg.size); + + msgR.set_data(msg->data.str, msg->data.size); + auto pbstr = msgR.SerializeAsString(); + + auto cli = ptr(handle); + return bus_send_reply(cli->bus, src, pbstr.data(), pbstr.size()); } diff --git a/cbhomeclient.h b/cbhomeclient.h index a884dea..3056b04 100644 --- a/cbhomeclient.h +++ b/cbhomeclient.h @@ -15,8 +15,9 @@ struct csubmsg* bus_client_get_submsg(void* handle); -struct creqmsg* bus_client_get_reqmsg(void* handle); -int bus_client_reply_reqmsg(void* handle, struct crepmsg* msg); +struct creqmsg* bus_client_get_reqmsg(void* handle, void** src); +int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg); +int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg); // test int bus_client_pubmsg(void* handle, void* data, const size_t size); diff --git a/main.cpp b/main.cpp index 85d4166..1e57df9 100644 --- a/main.cpp +++ b/main.cpp @@ -26,48 +26,40 @@ return pinfo; } -static unique_ptr<thread> pub(const vector<string>& topics){ +template <class F> void ignoref(F&& f){} - auto ptr = unique_ptr<thread>(new thread([&]{ - creg reg; - memset(®, 0, sizeof(reg)); - reg.pinfo = make_proc("pub", "pubid"); - reg.topic_pub = cstr_arr_new(topics.size()); - size_t i = 0; - for(; i < topics.size(); i++){ - cstr_arr_add(®.topic_pub, topics.at(0).data(), topics.at(0).size(), i); +static void pub(const vector<string>& topics){ + ignoref(pub); + + creg reg; + memset(®, 0, sizeof(reg)); + reg.pinfo = make_proc("pub", "pubid"); + reg.topic_pub = cstr_arr_new(topics.size()); + size_t i = 0; + for(; i < topics.size(); i++){ + cstr_arr_add(®.topic_pub, topics.at(0).data(), topics.at(0).size(), i); + } + void* handle = bus_client_init(NULL, 0, ®); + size_t count = 0; + string base_msg("test_pub_sub=="); + this_thread::sleep_for(chrono::seconds(3)); + while (true) { + for(auto && i : topics){ + auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++); + MsgPublish pbmsg; + pbmsg.set_topic(i); + pbmsg.set_data(msg); + auto data = pbmsg.SerializeAsString(); + // TestPub(i.c_str(), i.length(), data.c_str(), data.length()); + int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size()); + printf("======>> bus_client_pubmsg [%s]\n", msg.c_str()); + this_thread::sleep_for(chrono::seconds{2}); } - void* handle = bus_client_init(NULL, 0, ®); - - size_t count = 0; - string base_msg("test_pub_sub=="); - this_thread::sleep_for(chrono::seconds(3)); - while (true) { - for(auto && i : topics){ - auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++); - MsgPublish pbmsg; - pbmsg.set_topic(i); - pbmsg.set_data(msg); - auto data = pbmsg.SerializeAsString(); - // TestPub(i.c_str(), i.length(), data.c_str(), data.length()); - int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size()); - printf("======>> bus_client_pubmsg [%s]\n", msg.c_str()); - - this_thread::sleep_for(chrono::seconds{2}); - } - } - })); - ptr->detach(); - return ptr; + } } -int main(int argc, char const *argv[]) -{ - vector<string> topics{ - "cbhomeclient_test_pubsub" - }; - auto p = pub(topics); - // while (true) this_thread::sleep_for(chrono::seconds{1}); +static void sub(const vector<string>& topics){ + ignoref(sub); creg reg; memset(®, 0, sizeof(reg)); @@ -83,9 +75,81 @@ while (true) { auto msg = bus_client_get_submsg(handle); - printf("msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str); + printf("SUB msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str); } bus_client_free(handle); +} + +static void req(const char* topic){ + ignoref(req); + + string strtpc(topic); + creg reg; + memset(®, 0, sizeof(reg)); + reg.pinfo = make_proc("request", "requestid"); + // reg.channel = cstr_arr_new(1); + // size_t i = 0; + // for(; i < 1; i++){ + // cstr_arr_add(®.topic_pub, topic, strlen(topic), i); + // } + void* handle = bus_client_init(NULL, 0, ®); + size_t count = 0; + string base_msg("test_request=="); + this_thread::sleep_for(chrono::seconds(3)); + while (true) { + auto msg = base_msg + "request message -> msg-"+to_string(count++); + auto reqmsg = make_req_msg(strtpc.data(), strtpc.size(), msg.data(), msg.size()); + crepmsg* repmsg = NULL; + if (bus_client_request(handle, reqmsg, &repmsg)){ + printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data.str); + } + free_reqmsg(reqmsg); + free_reply_msg(repmsg); + this_thread::sleep_for(chrono::seconds{2}); + } +} + +static void reply(const char* topic){ + ignoref(reply); + + creg reg; + memset(®, 0, sizeof(reg)); + reg.pinfo = make_proc("reply", "replyid"); + reg.channel = cstr_arr_new(1); + size_t i = 0; + for(; i < 1; i++){ + cstr_arr_add(®.channel, topic, strlen(topic), i); + } + void* handle = bus_client_init(NULL, 0, ®); + size_t count = 0; + this_thread::sleep_for(chrono::seconds(3)); + while (true) { + void* src = NULL; + auto msg = bus_client_get_reqmsg(handle, &src); + auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12); + bus_client_reply_msg(handle, src, repmsg); + free_reply_msg(repmsg); + printf("REPREQ msg [%s] \n", msg->msg.str); + + free_reqmsg(msg); + this_thread::sleep_for(chrono::seconds{2}); + } +} + +int main(int argc, char const *argv[]) +{ + vector<string> topics{ + "cbhomeclient_test_pubsub" + }; + thread([&]{ pub(topics); }).detach(); + thread([&]{ sub(topics); }).detach(); + // sub(topics); + + printf("start RR\n"); + const char* rrtopic = "cbhomeclient_req_rep"; + thread([&]{ req(rrtopic); }).detach(); + reply(rrtopic); + return 0; } \ No newline at end of file diff --git a/message.cpp b/message.cpp index 591b50a..394e1d4 100644 --- a/message.cpp +++ b/message.cpp @@ -25,7 +25,13 @@ static struct cstr cstr_clone(const struct cstr old){ return cstr_new(old.str, old.size); } - +static struct cstr cstr_ref(char* str, const size_t len){ + struct cstr cs; + memset(&cs, 0, sizeof(cs)); + cs.size = len; + cs.str = str; + return cs; +} struct cstr cstr_new(const char* str, const size_t len){ struct cstr cs; memset(&cs, 0, sizeof(cs)); @@ -37,20 +43,20 @@ void cstr_free(struct cstr str){ if (str.str && str.size) free(str.str); } -struct cstrarr cstr_arr_new(const size_t len){ +struct cstrarr cstr_arr_new(const size_t count){ struct cstrarr arr; memset(&arr, 0, sizeof(arr)); - arr.arr = (struct cstr*)calloc(len, sizeof(struct cstr)); - arr.size = len; + arr.arr = (struct cstr*)calloc(count, sizeof(struct cstr)); + arr.count = count; return arr; } void cstr_arr_add(struct cstrarr* arr, const char* data, const size_t len, const size_t idx){ - if (arr->arr && arr->size && idx < arr->size){ + if (arr->arr && arr->count && idx < arr->count){ arr->arr[idx] = cstr_new(data, len); } } void cstr_arr_free(struct cstrarr arr){ - for(size_t i = 0; i < arr.size; i++){ + for(size_t i = 0; i < arr.count; i++){ auto & str = arr.arr[i]; cstr_free(str); } @@ -176,11 +182,11 @@ rinfo->pinfo = pinfo; auto assign_arr = [&obj](yyjson_val* v){ - const size_t size = yyjson_arr_size(v); + const size_t count = yyjson_arr_size(v); struct cstrarr arr; memset(&arr, 0, sizeof(arr)); - arr.size = size; - for(size_t i = 0; i < size; i++){ + arr.count = count; + for(size_t i = 0; i < count; i++){ auto sv = yyjson_arr_get(v, i); char* entry = NULL; size_t entry_size = 0; @@ -357,49 +363,60 @@ } ////////////////////////////////////////////////// -struct creq json2creq(const char* data, const size_t size){ - yyjson_doc* doc = yyjson_read(data, size, 0); +static tuple<struct cstr, struct cstr> json2reqmsg(const struct cstr msg){ + yyjson_doc* doc = yyjson_read(msg.str, msg.size, 0); yyjson_val* root = yyjson_doc_get_root(doc); auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);}; - struct creq req; - memset(&req, 0, sizeof(req)); - auto jp = obj(root, "path"); auto jb = obj(root, "body"); - req.path = cstr_new(yyjson_get_str(jp), yyjson_get_len(jp)); - req.body = cstr_new(yyjson_get_raw(jb), yyjson_get_len(jb)); - + auto tp = make_tuple(cstr_new(yyjson_get_str(jp), yyjson_get_len(jp)), + cstr_new(yyjson_get_raw(jb), yyjson_get_len(jb))); yyjson_doc_free(doc); - return req; + return tp; } -struct creqmsg* to_reqmsg(const char* pid, const size_t pids, - const char* data, const size_t size, void* src) +struct creqmsg* to_reqmsg(const char* pid, const size_t pids, const char* data, const size_t size) { auto msg = ptrT<struct creqmsg>(); msg->procid = cstr_new(pid, pids); MsgRequestTopic msgRT; if (!msgRT.ParseFromArray(data, size)) return NULL; + msg->msg = cstr_new(msgRT.data().data(), msgRT.data().size()); - // creg = msgRT.data(); - msg->msg = json2creq(data, size); - msg->src = src; + return msg; +} + +struct creqmsg* make_req_msg(const char* topic, const size_t topics, + const char* data, const size_t datal) +{ + auto msg = ptrT<struct creqmsg>(); + MsgRequestTopic msgRT; + msgRT.set_topic(topic, topics); + msgRT.set_data(data, datal); + + auto pbstr = msgRT.SerializeAsString(); + msg->msg = cstr_new(pbstr.data(), pbstr.size()); + return msg; } void free_reqmsg(struct creqmsg* msg){ if (msg){ cstr_free(msg->procid); - cstr_free(msg->msg.path); - cstr_free(msg->msg.body); + cstr_free(msg->msg); free(msg); } } struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg){ - auto &body = msg->msg.body; + struct cstr path, body; + memset(&path, 0, sizeof(path)); + memset(&body, 0, sizeof(body)); + tie(path, body) = json2reqmsg(msg->msg); + if (body.size == 0) return NULL; + auto doc = yyjson_read(body.str, body.size, 0); auto root = yyjson_doc_get_root(doc); @@ -411,6 +428,10 @@ stkmsg->stackid = cstr_new(yyjson_get_str(jsid), yyjson_get_len(jsid)); stkmsg->fileid = cstr_new(yyjson_get_str(jfid), yyjson_get_len(jfid)); yyjson_doc_free(doc); + + cstr_free(path); + cstr_free(body); + return stkmsg; } void free_reqmsg_stackerr(struct cstackmsgerr* msg){ @@ -422,7 +443,12 @@ } // decode success msg struct cstackmsg* get_reqmsg_stack(struct creqmsg* msg){ - auto &body = msg->msg.body; + struct cstr path, body; + memset(&path, 0, sizeof(path)); + memset(&body, 0, sizeof(body)); + tie(path, body) = json2reqmsg(msg->msg); + if (body.size == 0) return NULL; + auto doc = yyjson_read(body.str, body.size, 0); auto root = yyjson_doc_get_root(doc); @@ -471,22 +497,34 @@ } } -struct crepmsg* make_reply_msg(const int success, const char* msg, const size_t msgl, +struct crepmsg* make_reply_msg(const int errcode, const char* errmsg, const size_t emsgl, const char* data, const size_t datal) { - auto rmsg = ptrT<struct crepmsg>(); - // rmsg->success = success; - // rmsg->msg = cstr_new(msg, msgl); - // rmsg->data = cstr_new(msg, msgl); + auto msg = ptrT<struct crepmsg>(); + msg->errcode = errcode; + msg->errmsg = cstr_new(errmsg, emsgl); + msg->data = cstr_new(data, datal); + return msg; +} +struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl, + const char* data, const size_t datal) +{ auto doc = yyjson_mut_doc_new(NULL); + auto root = yyjson_mut_obj(doc); + yyjson_mut_obj_add_bool(doc, root, "success", !!success); + yyjson_mut_obj_add_strn(doc, root, "msg", msg, msgl); + yyjson_mut_obj_add_strn(doc, root, "data", data, datal); - return rmsg; + size_t jsonl = 0; + char* json = yyjson_mut_val_write(root, 0, &jsonl); + yyjson_mut_doc_free(doc); + return cstr_ref(json, jsonl); } void free_reply_msg(struct crepmsg* msg){ if (msg){ - cstr_free(msg->msg); + cstr_free(msg->errmsg); cstr_free(msg->data); free(msg); } diff --git a/message.h b/message.h index dc289df..6024a7b 100644 --- a/message.h +++ b/message.h @@ -16,7 +16,7 @@ struct cstrarr{ struct cstr* arr; - size_t size; + size_t count; }; // 杩涚▼娉ㄥ唽淇℃伅 struct creg{ @@ -88,16 +88,10 @@ }; ////////////////////////////////////////// -// request body -struct creq{ - struct cstr path; - struct cstr body; -}; // request msg struct creqmsg{ struct cstr procid; - struct creq msg; - void* src; + struct cstr msg; }; // decode stack err msg struct cstackmsgerr{ @@ -128,10 +122,8 @@ // reply msg struct crepmsg{ - struct cstr json; - - int success; - struct cstr msg; + int errcode; + struct cstr errmsg; struct cstr data; }; @@ -141,7 +133,7 @@ struct cstr cstr_new(const char* str, const size_t len); void cstr_free(struct cstr str); -struct cstrarr cstr_arr_new(const size_t len); +struct cstrarr cstr_arr_new(const size_t count); void cstr_arr_add(struct cstrarr* arr, const char* data, const size_t len, const size_t idx); void cstr_arr_free(struct cstrarr arr); @@ -161,8 +153,10 @@ void free_submsg_proclist(struct cproclist* ppl); // request msg -struct creqmsg* to_reqmsg(const char* pid, const size_t pids, - const char* data, const size_t size, void* src); +struct creqmsg* to_reqmsg(const char* pid,const size_t pids,const char* data,const size_t size); +struct creqmsg* make_req_msg(const char* topic, const size_t topics, + const char* data, const size_t datal); + void free_reqmsg(struct creqmsg* msg); // decode err msg struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg); @@ -172,8 +166,11 @@ void free_reqmsg_stack(struct cstackmsg* msg); // reply msg -struct crepmsg* make_reply_msg(const int success, const char* msg, const size_t msgl, +struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl, const char* data, const size_t datal); +struct crepmsg* make_reply_msg(const int errcode, const char* errmsg, const size_t emsgl, + const char* data, const size_t datal); + void free_reply_msg(struct crepmsg* msg); #ifdef __cplusplus -- Gitblit v1.8.0