From ecf23f882ca1b8aaf0863980fc4781c515da1695 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 12 十二月 2022 16:49:03 +0800
Subject: [PATCH] add req rep

---
 main.cpp |  142 ++++++++++++++++++++++++++++++++++-------------
 1 files changed, 103 insertions(+), 39 deletions(-)

diff --git a/main.cpp b/main.cpp
index 85d4166..1e57df9 100644
--- a/main.cpp
+++ b/main.cpp
@@ -26,48 +26,40 @@
     return pinfo;
 }
 
-static unique_ptr<thread> pub(const vector<string>& topics){
+template <class F> void ignoref(F&& f){}
 
-    auto ptr = unique_ptr<thread>(new thread([&]{
-        creg reg;
-        memset(&reg, 0, sizeof(reg));
-        reg.pinfo = make_proc("pub", "pubid");
-        reg.topic_pub = cstr_arr_new(topics.size());
-        size_t i = 0;
-        for(; i < topics.size(); i++){
-            cstr_arr_add(&reg.topic_pub, topics.at(0).data(), topics.at(0).size(), i);
+static void pub(const vector<string>& topics){
+    ignoref(pub);
+
+    creg reg;
+    memset(&reg, 0, sizeof(reg));
+    reg.pinfo = make_proc("pub", "pubid");
+    reg.topic_pub = cstr_arr_new(topics.size());
+    size_t i = 0;
+    for(; i < topics.size(); i++){
+        cstr_arr_add(&reg.topic_pub, topics.at(0).data(), topics.at(0).size(), i);
+    }
+    void* handle = bus_client_init(NULL, 0, &reg);
+    size_t count = 0;
+    string base_msg("test_pub_sub==");
+    this_thread::sleep_for(chrono::seconds(3));
+    while (true) {
+        for(auto && i : topics){
+            auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
+            MsgPublish pbmsg;
+            pbmsg.set_topic(i);
+            pbmsg.set_data(msg);
+            auto data = pbmsg.SerializeAsString();
+            // TestPub(i.c_str(), i.length(), data.c_str(), data.length());
+            int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size());
+            printf("======>> bus_client_pubmsg [%s]\n", msg.c_str());
+            this_thread::sleep_for(chrono::seconds{2});
         }
-        void* handle = bus_client_init(NULL, 0, &reg);
-
-        size_t count = 0;
-        string base_msg("test_pub_sub==");
-        this_thread::sleep_for(chrono::seconds(3));
-        while (true) {
-            for(auto && i : topics){
-                auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
-                MsgPublish pbmsg;
-                pbmsg.set_topic(i);
-                pbmsg.set_data(msg);
-                auto data = pbmsg.SerializeAsString();
-                // TestPub(i.c_str(), i.length(), data.c_str(), data.length());
-                int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size());
-                printf("======>> bus_client_pubmsg [%s]\n", msg.c_str());
-
-                this_thread::sleep_for(chrono::seconds{2});
-            }
-        }
-    }));
-    ptr->detach();
-    return ptr;
+    }
 }
 
-int main(int argc, char const *argv[])
-{
-    vector<string> topics{
-        "cbhomeclient_test_pubsub"
-    };
-    auto p = pub(topics);
-    // while (true) this_thread::sleep_for(chrono::seconds{1});
+static void sub(const vector<string>& topics){
+    ignoref(sub);
 
     creg reg;
     memset(&reg, 0, sizeof(reg));
@@ -83,9 +75,81 @@
 
     while (true) {
         auto msg = bus_client_get_submsg(handle);
-        printf("msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str);
+        printf("SUB msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str);
     }
 
     bus_client_free(handle);
+}
+
+static void req(const char* topic){
+    ignoref(req);
+
+    string strtpc(topic);
+    creg reg;
+    memset(&reg, 0, sizeof(reg));
+    reg.pinfo = make_proc("request", "requestid");
+    // reg.channel = cstr_arr_new(1);
+    // size_t i = 0;
+    // for(; i < 1; i++){
+    //     cstr_arr_add(&reg.topic_pub, topic, strlen(topic), i);
+    // }
+    void* handle = bus_client_init(NULL, 0, &reg);
+    size_t count = 0;
+    string base_msg("test_request==");
+    this_thread::sleep_for(chrono::seconds(3));
+    while (true) {
+        auto msg = base_msg + "request message -> msg-"+to_string(count++);
+        auto reqmsg = make_req_msg(strtpc.data(), strtpc.size(), msg.data(), msg.size());
+        crepmsg* repmsg = NULL;
+        if (bus_client_request(handle, reqmsg, &repmsg)){
+            printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data.str);
+        }
+        free_reqmsg(reqmsg);
+        free_reply_msg(repmsg);
+        this_thread::sleep_for(chrono::seconds{2});
+    }
+}
+
+static void reply(const char* topic){
+    ignoref(reply);
+
+    creg reg;
+    memset(&reg, 0, sizeof(reg));
+    reg.pinfo = make_proc("reply", "replyid");
+    reg.channel = cstr_arr_new(1);
+    size_t i = 0;
+    for(; i < 1; i++){
+        cstr_arr_add(&reg.channel, topic, strlen(topic), i);
+    }
+    void* handle = bus_client_init(NULL, 0, &reg);
+    size_t count = 0;
+    this_thread::sleep_for(chrono::seconds(3));
+    while (true) {
+        void* src = NULL;
+        auto msg = bus_client_get_reqmsg(handle, &src);
+        auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12);
+        bus_client_reply_msg(handle, src, repmsg);
+        free_reply_msg(repmsg);
+        printf("REPREQ msg [%s] \n", msg->msg.str);
+
+        free_reqmsg(msg);
+        this_thread::sleep_for(chrono::seconds{2});
+    }
+}
+
+int main(int argc, char const *argv[])
+{
+    vector<string> topics{
+        "cbhomeclient_test_pubsub"
+    };
+    thread([&]{ pub(topics); }).detach();
+    thread([&]{ sub(topics); }).detach();
+    // sub(topics);
+
+    printf("start RR\n");
+    const char* rrtopic = "cbhomeclient_req_rep";
+    thread([&]{ req(rrtopic); }).detach();
+    reply(rrtopic);
+
     return 0;
 }
\ No newline at end of file

--
Gitblit v1.8.0