From cf0a3209b51babf72469d962914db0dac2e5f52c Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 27 十二月 2022 14:13:30 +0800 Subject: [PATCH] add get msg timeout --- main.cpp | 170 ++++++++++++++++++++++++++++++++++++-------------------- 1 files changed, 110 insertions(+), 60 deletions(-) diff --git a/main.cpp b/main.cpp index 85d4166..9f400b8 100644 --- a/main.cpp +++ b/main.cpp @@ -9,56 +9,118 @@ #include "cbhomeclient.h" #include "message.h" -// #include "3dparty/bus_nng/bn_api.h" +// #include "3rdparty/bus_nng/bn_api.h" #include "bhome_msg_api.pb.h" using namespace bhome_msg; -static cproc* make_proc(const char* name, const char* id){ - cproc* pinfo = (cproc*)calloc(1,sizeof(cproc)); - auto assign = [](char** d, size_t* l, const char* tmp){ - *l = strlen(tmp); - *d = (char*)malloc(*l); - memcpy(*d, tmp, *l); - }; - assign(&pinfo->name.str, &pinfo->name.size, name); - assign(&pinfo->id.str, &pinfo->id.size, id); +template <class F> void ignoref(F&& f){} - return pinfo; +static void pub(const vector<string>& topics){ + ignoref(pub); + + vector<const char*> tpc; + for(auto& t : topics) tpc.push_back(t.c_str()); + + creg* reg = make_creg(make_cproc("pub", "pubid"), + NULL, 0, &tpc[0], tpc.size(), NULL, 0, NULL, 0); + + void* handle = bus_client_init(NULL, 0, reg); + creg_free(reg); + + size_t count = 0; + string base_msg("test_pub_sub=="); + this_thread::sleep_for(chrono::seconds(3)); + while (true) { + for(auto && i : topics){ + auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++); + // MsgPublish pbmsg; + // pbmsg.set_topic(i); + // pbmsg.set_data(msg); + // auto data = pbmsg.SerializeAsString(); + // int ret = bus_client_pubmsg(handle, data.data(), data.size()); + + int ret = bus_client_publish(handle, i.data(), i.size(), msg.data(), msg.size()); + printf("======>> bus_client_pubmsg [%s] ret %d\n", msg.c_str(), ret); + this_thread::sleep_for(chrono::seconds{1}); + } + } } -static unique_ptr<thread> pub(const vector<string>& topics){ +static void sub(const vector<string>& topics){ + ignoref(sub); - auto ptr = unique_ptr<thread>(new thread([&]{ - creg reg; - memset(®, 0, sizeof(reg)); - reg.pinfo = make_proc("pub", "pubid"); - reg.topic_pub = cstr_arr_new(topics.size()); - size_t i = 0; - for(; i < topics.size(); i++){ - cstr_arr_add(®.topic_pub, topics.at(0).data(), topics.at(0).size(), i); + vector<const char*> tpc; + for(auto& t : topics) tpc.push_back(t.c_str()); + + creg* reg = make_creg(make_cproc("sub", "subid"), + NULL, 0, NULL, 0, &tpc[0], tpc.size(), NULL, 0); + + void* handle = bus_client_init(NULL, 0, reg); + creg_free(reg); + + while (true) { + auto msg = bus_client_get_submsg(handle); + printf("SUB msg topic [%s] data [%s]\n", msg->topic, msg->msg); + free_submsg(msg); + } + + bus_client_free(handle); +} + +static void req(const char* topic){ + ignoref(req); + + const auto topicl = strlen(topic); + creg* reg = make_creg_from_cproc(make_cproc("request", "requestid")); + + void* handle = bus_client_init(NULL, 0, reg); + creg_free(reg); + + size_t count = 0; + string base_msg("test_request==request message -> msg-"); + this_thread::sleep_for(chrono::seconds(3)); + while (true) { + auto msg = base_msg + to_string(count++); + auto reqmsg = make_req_msg(topic, topicl, msg.data(), msg.size()); + crepmsg* repmsg = NULL; + if (bus_client_request(handle, reqmsg, &repmsg)){ + printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data); } - void* handle = bus_client_init(NULL, 0, ®); + free_reqmsg(reqmsg); + free_reply_msg(repmsg); + this_thread::sleep_for(chrono::seconds{2}); + } +} - size_t count = 0; - string base_msg("test_pub_sub=="); - this_thread::sleep_for(chrono::seconds(3)); - while (true) { - for(auto && i : topics){ - auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++); - MsgPublish pbmsg; - pbmsg.set_topic(i); - pbmsg.set_data(msg); - auto data = pbmsg.SerializeAsString(); - // TestPub(i.c_str(), i.length(), data.c_str(), data.length()); - int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size()); - printf("======>> bus_client_pubmsg [%s]\n", msg.c_str()); +static void reply(const char* topic){ + ignoref(reply); - this_thread::sleep_for(chrono::seconds{2}); - } - } - })); - ptr->detach(); - return ptr; + const auto topicl = strlen(topic); + vector<const char*> tpc{topic}; + + // creg* reg = make_creg(make_cproc("reply", "replyid"), + // &tpc[0], tpc.size(), &tpc[0], tpc.size(), NULL, 0, NULL, 0); + + creg* reg = make_creg_from_cproc(make_cproc("reply", "replyid")); + creg_add_topic_reply(reg, tpc.data(), tpc.size()); + creg_add_topic_pub(reg, tpc.data(), tpc.size()); + + void* handle = bus_client_init(NULL, 0, reg); + creg_free(reg); + + size_t count = 0; + this_thread::sleep_for(chrono::seconds(3)); + while (true) { + void* src = NULL; + auto msg = bus_client_get_reqmsg(handle, &src); + auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12); + bus_client_reply_msg(handle, src, repmsg); + free_reply_msg(repmsg); + printf("REPREQ msg [%s] \n", msg->msg); + + free_reqmsg(msg); + // this_thread::sleep_for(chrono::seconds{2}); + } } int main(int argc, char const *argv[]) @@ -66,26 +128,14 @@ vector<string> topics{ "cbhomeclient_test_pubsub" }; - auto p = pub(topics); - // while (true) this_thread::sleep_for(chrono::seconds{1}); + thread([&]{ pub(topics); }).detach(); + thread([&]{ sub(topics); }).detach(); + // sub(topics); - creg reg; - memset(®, 0, sizeof(reg)); - reg.pinfo = make_proc("sub", "subid"); + printf("start RR\n"); + const char* rrtopic = "cbhomeclient_req_rep"; + thread([&]{ req(rrtopic); }).detach(); + reply(rrtopic); - reg.topic_sub = cstr_arr_new(topics.size()); - size_t i = 0; - for(; i < topics.size(); i++){ - cstr_arr_add(®.topic_sub, topics.at(0).data(), topics.at(0).size(), i); - } - - void* handle = bus_client_init(NULL, 0, ®); - - while (true) { - auto msg = bus_client_get_submsg(handle); - printf("msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str); - } - - bus_client_free(handle); return 0; -} \ No newline at end of file +} -- Gitblit v1.8.0