From ecf23f882ca1b8aaf0863980fc4781c515da1695 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 12 十二月 2022 16:49:03 +0800 Subject: [PATCH] add req rep --- main.cpp | 142 ++++++++++++++++++++++++++++++++++------------- 1 files changed, 103 insertions(+), 39 deletions(-) diff --git a/main.cpp b/main.cpp index 85d4166..1e57df9 100644 --- a/main.cpp +++ b/main.cpp @@ -26,48 +26,40 @@ return pinfo; } -static unique_ptr<thread> pub(const vector<string>& topics){ +template <class F> void ignoref(F&& f){} - auto ptr = unique_ptr<thread>(new thread([&]{ - creg reg; - memset(®, 0, sizeof(reg)); - reg.pinfo = make_proc("pub", "pubid"); - reg.topic_pub = cstr_arr_new(topics.size()); - size_t i = 0; - for(; i < topics.size(); i++){ - cstr_arr_add(®.topic_pub, topics.at(0).data(), topics.at(0).size(), i); +static void pub(const vector<string>& topics){ + ignoref(pub); + + creg reg; + memset(®, 0, sizeof(reg)); + reg.pinfo = make_proc("pub", "pubid"); + reg.topic_pub = cstr_arr_new(topics.size()); + size_t i = 0; + for(; i < topics.size(); i++){ + cstr_arr_add(®.topic_pub, topics.at(0).data(), topics.at(0).size(), i); + } + void* handle = bus_client_init(NULL, 0, ®); + size_t count = 0; + string base_msg("test_pub_sub=="); + this_thread::sleep_for(chrono::seconds(3)); + while (true) { + for(auto && i : topics){ + auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++); + MsgPublish pbmsg; + pbmsg.set_topic(i); + pbmsg.set_data(msg); + auto data = pbmsg.SerializeAsString(); + // TestPub(i.c_str(), i.length(), data.c_str(), data.length()); + int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size()); + printf("======>> bus_client_pubmsg [%s]\n", msg.c_str()); + this_thread::sleep_for(chrono::seconds{2}); } - void* handle = bus_client_init(NULL, 0, ®); - - size_t count = 0; - string base_msg("test_pub_sub=="); - this_thread::sleep_for(chrono::seconds(3)); - while (true) { - for(auto && i : topics){ - auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++); - MsgPublish pbmsg; - pbmsg.set_topic(i); - pbmsg.set_data(msg); - auto data = pbmsg.SerializeAsString(); - // TestPub(i.c_str(), i.length(), data.c_str(), data.length()); - int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size()); - printf("======>> bus_client_pubmsg [%s]\n", msg.c_str()); - - this_thread::sleep_for(chrono::seconds{2}); - } - } - })); - ptr->detach(); - return ptr; + } } -int main(int argc, char const *argv[]) -{ - vector<string> topics{ - "cbhomeclient_test_pubsub" - }; - auto p = pub(topics); - // while (true) this_thread::sleep_for(chrono::seconds{1}); +static void sub(const vector<string>& topics){ + ignoref(sub); creg reg; memset(®, 0, sizeof(reg)); @@ -83,9 +75,81 @@ while (true) { auto msg = bus_client_get_submsg(handle); - printf("msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str); + printf("SUB msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str); } bus_client_free(handle); +} + +static void req(const char* topic){ + ignoref(req); + + string strtpc(topic); + creg reg; + memset(®, 0, sizeof(reg)); + reg.pinfo = make_proc("request", "requestid"); + // reg.channel = cstr_arr_new(1); + // size_t i = 0; + // for(; i < 1; i++){ + // cstr_arr_add(®.topic_pub, topic, strlen(topic), i); + // } + void* handle = bus_client_init(NULL, 0, ®); + size_t count = 0; + string base_msg("test_request=="); + this_thread::sleep_for(chrono::seconds(3)); + while (true) { + auto msg = base_msg + "request message -> msg-"+to_string(count++); + auto reqmsg = make_req_msg(strtpc.data(), strtpc.size(), msg.data(), msg.size()); + crepmsg* repmsg = NULL; + if (bus_client_request(handle, reqmsg, &repmsg)){ + printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data.str); + } + free_reqmsg(reqmsg); + free_reply_msg(repmsg); + this_thread::sleep_for(chrono::seconds{2}); + } +} + +static void reply(const char* topic){ + ignoref(reply); + + creg reg; + memset(®, 0, sizeof(reg)); + reg.pinfo = make_proc("reply", "replyid"); + reg.channel = cstr_arr_new(1); + size_t i = 0; + for(; i < 1; i++){ + cstr_arr_add(®.channel, topic, strlen(topic), i); + } + void* handle = bus_client_init(NULL, 0, ®); + size_t count = 0; + this_thread::sleep_for(chrono::seconds(3)); + while (true) { + void* src = NULL; + auto msg = bus_client_get_reqmsg(handle, &src); + auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12); + bus_client_reply_msg(handle, src, repmsg); + free_reply_msg(repmsg); + printf("REPREQ msg [%s] \n", msg->msg.str); + + free_reqmsg(msg); + this_thread::sleep_for(chrono::seconds{2}); + } +} + +int main(int argc, char const *argv[]) +{ + vector<string> topics{ + "cbhomeclient_test_pubsub" + }; + thread([&]{ pub(topics); }).detach(); + thread([&]{ sub(topics); }).detach(); + // sub(topics); + + printf("start RR\n"); + const char* rrtopic = "cbhomeclient_req_rep"; + thread([&]{ req(rrtopic); }).detach(); + reply(rrtopic); + return 0; } \ No newline at end of file -- Gitblit v1.8.0