From 66904a369b51e4be25e81ecc1f6bfd8818d6baa6 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 12 十二月 2022 16:56:35 +0800
Subject: [PATCH] use release
---
main.cpp | 142 ++++++++++++++++++++++++++++++++++-------------
1 files changed, 103 insertions(+), 39 deletions(-)
diff --git a/main.cpp b/main.cpp
index 85d4166..1e57df9 100644
--- a/main.cpp
+++ b/main.cpp
@@ -26,48 +26,40 @@
return pinfo;
}
-static unique_ptr<thread> pub(const vector<string>& topics){
+template <class F> void ignoref(F&& f){}
- auto ptr = unique_ptr<thread>(new thread([&]{
- creg reg;
- memset(®, 0, sizeof(reg));
- reg.pinfo = make_proc("pub", "pubid");
- reg.topic_pub = cstr_arr_new(topics.size());
- size_t i = 0;
- for(; i < topics.size(); i++){
- cstr_arr_add(®.topic_pub, topics.at(0).data(), topics.at(0).size(), i);
+static void pub(const vector<string>& topics){
+ ignoref(pub);
+
+ creg reg;
+ memset(®, 0, sizeof(reg));
+ reg.pinfo = make_proc("pub", "pubid");
+ reg.topic_pub = cstr_arr_new(topics.size());
+ size_t i = 0;
+ for(; i < topics.size(); i++){
+ cstr_arr_add(®.topic_pub, topics.at(0).data(), topics.at(0).size(), i);
+ }
+ void* handle = bus_client_init(NULL, 0, ®);
+ size_t count = 0;
+ string base_msg("test_pub_sub==");
+ this_thread::sleep_for(chrono::seconds(3));
+ while (true) {
+ for(auto && i : topics){
+ auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
+ MsgPublish pbmsg;
+ pbmsg.set_topic(i);
+ pbmsg.set_data(msg);
+ auto data = pbmsg.SerializeAsString();
+ // TestPub(i.c_str(), i.length(), data.c_str(), data.length());
+ int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size());
+ printf("======>> bus_client_pubmsg [%s]\n", msg.c_str());
+ this_thread::sleep_for(chrono::seconds{2});
}
- void* handle = bus_client_init(NULL, 0, ®);
-
- size_t count = 0;
- string base_msg("test_pub_sub==");
- this_thread::sleep_for(chrono::seconds(3));
- while (true) {
- for(auto && i : topics){
- auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
- MsgPublish pbmsg;
- pbmsg.set_topic(i);
- pbmsg.set_data(msg);
- auto data = pbmsg.SerializeAsString();
- // TestPub(i.c_str(), i.length(), data.c_str(), data.length());
- int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size());
- printf("======>> bus_client_pubmsg [%s]\n", msg.c_str());
-
- this_thread::sleep_for(chrono::seconds{2});
- }
- }
- }));
- ptr->detach();
- return ptr;
+ }
}
-int main(int argc, char const *argv[])
-{
- vector<string> topics{
- "cbhomeclient_test_pubsub"
- };
- auto p = pub(topics);
- // while (true) this_thread::sleep_for(chrono::seconds{1});
+static void sub(const vector<string>& topics){
+ ignoref(sub);
creg reg;
memset(®, 0, sizeof(reg));
@@ -83,9 +75,81 @@
while (true) {
auto msg = bus_client_get_submsg(handle);
- printf("msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str);
+ printf("SUB msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str);
}
bus_client_free(handle);
+}
+
+static void req(const char* topic){
+ ignoref(req);
+
+ string strtpc(topic);
+ creg reg;
+ memset(®, 0, sizeof(reg));
+ reg.pinfo = make_proc("request", "requestid");
+ // reg.channel = cstr_arr_new(1);
+ // size_t i = 0;
+ // for(; i < 1; i++){
+ // cstr_arr_add(®.topic_pub, topic, strlen(topic), i);
+ // }
+ void* handle = bus_client_init(NULL, 0, ®);
+ size_t count = 0;
+ string base_msg("test_request==");
+ this_thread::sleep_for(chrono::seconds(3));
+ while (true) {
+ auto msg = base_msg + "request message -> msg-"+to_string(count++);
+ auto reqmsg = make_req_msg(strtpc.data(), strtpc.size(), msg.data(), msg.size());
+ crepmsg* repmsg = NULL;
+ if (bus_client_request(handle, reqmsg, &repmsg)){
+ printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data.str);
+ }
+ free_reqmsg(reqmsg);
+ free_reply_msg(repmsg);
+ this_thread::sleep_for(chrono::seconds{2});
+ }
+}
+
+static void reply(const char* topic){
+ ignoref(reply);
+
+ creg reg;
+ memset(®, 0, sizeof(reg));
+ reg.pinfo = make_proc("reply", "replyid");
+ reg.channel = cstr_arr_new(1);
+ size_t i = 0;
+ for(; i < 1; i++){
+ cstr_arr_add(®.channel, topic, strlen(topic), i);
+ }
+ void* handle = bus_client_init(NULL, 0, ®);
+ size_t count = 0;
+ this_thread::sleep_for(chrono::seconds(3));
+ while (true) {
+ void* src = NULL;
+ auto msg = bus_client_get_reqmsg(handle, &src);
+ auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12);
+ bus_client_reply_msg(handle, src, repmsg);
+ free_reply_msg(repmsg);
+ printf("REPREQ msg [%s] \n", msg->msg.str);
+
+ free_reqmsg(msg);
+ this_thread::sleep_for(chrono::seconds{2});
+ }
+}
+
+int main(int argc, char const *argv[])
+{
+ vector<string> topics{
+ "cbhomeclient_test_pubsub"
+ };
+ thread([&]{ pub(topics); }).detach();
+ thread([&]{ sub(topics); }).detach();
+ // sub(topics);
+
+ printf("start RR\n");
+ const char* rrtopic = "cbhomeclient_req_rep";
+ thread([&]{ req(rrtopic); }).detach();
+ reply(rrtopic);
+
return 0;
}
\ No newline at end of file
--
Gitblit v1.8.0