From ecf23f882ca1b8aaf0863980fc4781c515da1695 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 12 十二月 2022 16:49:03 +0800
Subject: [PATCH] add req rep

---
 3dparty/bus_nng/x86_64/libbus_nng.so |    0 
 main.cpp                             |  142 +++++++++++++++++------
 message.cpp                          |  106 ++++++++++++-----
 message.h                            |   29 ++--
 cbhomeclient.cpp                     |   64 +++++++++-
 cbhomeclient.h                       |    5 
 6 files changed, 247 insertions(+), 99 deletions(-)

diff --git a/3dparty/bus_nng/x86_64/libbus_nng.so b/3dparty/bus_nng/x86_64/libbus_nng.so
index d52d466..d01dcbd 100755
--- a/3dparty/bus_nng/x86_64/libbus_nng.so
+++ b/3dparty/bus_nng/x86_64/libbus_nng.so
Binary files differ
diff --git a/cbhomeclient.cpp b/cbhomeclient.cpp
index 1d213bb..9c8701e 100644
--- a/cbhomeclient.cpp
+++ b/cbhomeclient.cpp
@@ -121,9 +121,9 @@
 template <class F>
 MsgCR to_topic(client* cli, F&& f, const struct cstrarr topic){
     MsgCR msg(dummy());
-    if (topic.arr && topic.size){
+    if (topic.arr && topic.count){
         MsgTopicList tlist;
-        for(size_t i = 0; i < topic.size; i++)
+        for(size_t i = 0; i < topic.count; i++)
             tlist.add_topic_list(topic.arr[i].str, topic.arr[i].size);
         const auto& tpc = tlist.SerializeAsString();
         void* replymsg = NULL;
@@ -189,8 +189,22 @@
         cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); }));
     }
 
+    // request/reply鍜宲ub topic涓�璧峰鐞�
+    auto tmparr = cstr_arr_new(rinfo->channel.count + rinfo->topic_pub.count);
+    auto addarr = [&tmparr](size_t& start, const struct cstrarr* arr){
+        for(size_t i = 0; i < arr->count; i++){
+            cstr_arr_add(&tmparr, arr->arr[i].str, arr->arr[i].size, start+i);
+        }
+        start += arr->count;
+    };
+    size_t s = 0;
+    addarr(s, &rinfo->channel);
+    addarr(s, &rinfo->topic_pub);
+    auto tpcmsg = to_topic(cli, bus_register_topics, tmparr);
+    cstr_arr_free(tmparr);
+    // auto channelmsg = to_topic(cli, bus_register_topics, rinfo->channel);
+    // auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub);
     // if topic pub/sub[net] exist, register topics
-    auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub);
     auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub);
     auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net);
 
@@ -251,29 +265,63 @@
     return pmsg;
 }
 
-struct creqmsg* bus_client_get_reqmsg(void* handle){
+struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
     client* cli = ptr(handle);
     Msg msg = std::move(cli->readreq_q->pop());
     if (msg.empty()) return NULL;
 
-    void* procid = NULL, *data = NULL, *src = NULL;
+    void* procid = NULL, *data = NULL;
     int pids = 0, size = 0;
 
     tie(procid, pids) = msg.at(0);
     tie(data, size) = msg.at(1);
-    tie(src, ignore) = msg.at(2);
+    tie(*src, ignore) = msg.at(2);
 
-    auto pmsg = to_reqmsg((const char*)procid, pids, (const char *)data, size, src);
+    auto pmsg = to_reqmsg((const char*)procid, pids, (const char *)data, size);
     bus_free(procid, pids);
     bus_free(data, size);
 
     return pmsg;
 }
 
-int bus_client_reply_reqmsg(void* handle, struct crepmsg* msg){
+int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){
 
+    void* procid = NULL, *reply = NULL;
+    int pids = 0, replys = 0;
+    auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
+                    msg->msg.str, msg->msg.size, &procid, &pids,
+                    &reply, &replys, sndto));
+    if (!vmsg.empty()){
+        void* procid = NULL, *data = NULL;
+        int pids = 0, size = 0;
+        tie(procid, pids) = vmsg.at(0);
+        bus_free(procid, pids);
+        tie(data, size) = vmsg.at(1);
+        MsgRequestTopicReply msgRT;
+        auto pb = msgRT.ParseFromArray(reply, replys);
+        bus_free(reply, replys);
+        if (!pb) return false;
+
+        *repmsg = make_reply_msg(msgRT.errmsg().errcode(),
+            msgRT.errmsg().errstring().data(), msgRT.errmsg().errstring().size(),
+            msgRT.data().data(), msgRT.data().size());
+        return true;
+    }
+    return false;
+}
+
+int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg){
 
     MsgRequestTopicReply msgR;
+    auto err = msgR.mutable_errmsg();
+    err->set_errcode((ErrorCode)msg->errcode);
+    err->set_errstring(msg->errmsg.str, msg->errmsg.size);
+
+    msgR.set_data(msg->data.str, msg->data.size);
+    auto pbstr = msgR.SerializeAsString();
+
+    auto cli = ptr(handle);
+    return bus_send_reply(cli->bus, src, pbstr.data(), pbstr.size());
 }
 
 
diff --git a/cbhomeclient.h b/cbhomeclient.h
index a884dea..3056b04 100644
--- a/cbhomeclient.h
+++ b/cbhomeclient.h
@@ -15,8 +15,9 @@
 
 struct csubmsg* bus_client_get_submsg(void* handle);
 
-struct creqmsg* bus_client_get_reqmsg(void* handle);
-int bus_client_reply_reqmsg(void* handle, struct crepmsg* msg);
+struct creqmsg* bus_client_get_reqmsg(void* handle, void** src);
+int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg);
+int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg);
 
 // test
 int bus_client_pubmsg(void* handle, void* data, const size_t size);
diff --git a/main.cpp b/main.cpp
index 85d4166..1e57df9 100644
--- a/main.cpp
+++ b/main.cpp
@@ -26,48 +26,40 @@
     return pinfo;
 }
 
-static unique_ptr<thread> pub(const vector<string>& topics){
+template <class F> void ignoref(F&& f){}
 
-    auto ptr = unique_ptr<thread>(new thread([&]{
-        creg reg;
-        memset(&reg, 0, sizeof(reg));
-        reg.pinfo = make_proc("pub", "pubid");
-        reg.topic_pub = cstr_arr_new(topics.size());
-        size_t i = 0;
-        for(; i < topics.size(); i++){
-            cstr_arr_add(&reg.topic_pub, topics.at(0).data(), topics.at(0).size(), i);
+static void pub(const vector<string>& topics){
+    ignoref(pub);
+
+    creg reg;
+    memset(&reg, 0, sizeof(reg));
+    reg.pinfo = make_proc("pub", "pubid");
+    reg.topic_pub = cstr_arr_new(topics.size());
+    size_t i = 0;
+    for(; i < topics.size(); i++){
+        cstr_arr_add(&reg.topic_pub, topics.at(0).data(), topics.at(0).size(), i);
+    }
+    void* handle = bus_client_init(NULL, 0, &reg);
+    size_t count = 0;
+    string base_msg("test_pub_sub==");
+    this_thread::sleep_for(chrono::seconds(3));
+    while (true) {
+        for(auto && i : topics){
+            auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
+            MsgPublish pbmsg;
+            pbmsg.set_topic(i);
+            pbmsg.set_data(msg);
+            auto data = pbmsg.SerializeAsString();
+            // TestPub(i.c_str(), i.length(), data.c_str(), data.length());
+            int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size());
+            printf("======>> bus_client_pubmsg [%s]\n", msg.c_str());
+            this_thread::sleep_for(chrono::seconds{2});
         }
-        void* handle = bus_client_init(NULL, 0, &reg);
-
-        size_t count = 0;
-        string base_msg("test_pub_sub==");
-        this_thread::sleep_for(chrono::seconds(3));
-        while (true) {
-            for(auto && i : topics){
-                auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
-                MsgPublish pbmsg;
-                pbmsg.set_topic(i);
-                pbmsg.set_data(msg);
-                auto data = pbmsg.SerializeAsString();
-                // TestPub(i.c_str(), i.length(), data.c_str(), data.length());
-                int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size());
-                printf("======>> bus_client_pubmsg [%s]\n", msg.c_str());
-
-                this_thread::sleep_for(chrono::seconds{2});
-            }
-        }
-    }));
-    ptr->detach();
-    return ptr;
+    }
 }
 
-int main(int argc, char const *argv[])
-{
-    vector<string> topics{
-        "cbhomeclient_test_pubsub"
-    };
-    auto p = pub(topics);
-    // while (true) this_thread::sleep_for(chrono::seconds{1});
+static void sub(const vector<string>& topics){
+    ignoref(sub);
 
     creg reg;
     memset(&reg, 0, sizeof(reg));
@@ -83,9 +75,81 @@
 
     while (true) {
         auto msg = bus_client_get_submsg(handle);
-        printf("msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str);
+        printf("SUB msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str);
     }
 
     bus_client_free(handle);
+}
+
+static void req(const char* topic){
+    ignoref(req);
+
+    string strtpc(topic);
+    creg reg;
+    memset(&reg, 0, sizeof(reg));
+    reg.pinfo = make_proc("request", "requestid");
+    // reg.channel = cstr_arr_new(1);
+    // size_t i = 0;
+    // for(; i < 1; i++){
+    //     cstr_arr_add(&reg.topic_pub, topic, strlen(topic), i);
+    // }
+    void* handle = bus_client_init(NULL, 0, &reg);
+    size_t count = 0;
+    string base_msg("test_request==");
+    this_thread::sleep_for(chrono::seconds(3));
+    while (true) {
+        auto msg = base_msg + "request message -> msg-"+to_string(count++);
+        auto reqmsg = make_req_msg(strtpc.data(), strtpc.size(), msg.data(), msg.size());
+        crepmsg* repmsg = NULL;
+        if (bus_client_request(handle, reqmsg, &repmsg)){
+            printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data.str);
+        }
+        free_reqmsg(reqmsg);
+        free_reply_msg(repmsg);
+        this_thread::sleep_for(chrono::seconds{2});
+    }
+}
+
+static void reply(const char* topic){
+    ignoref(reply);
+
+    creg reg;
+    memset(&reg, 0, sizeof(reg));
+    reg.pinfo = make_proc("reply", "replyid");
+    reg.channel = cstr_arr_new(1);
+    size_t i = 0;
+    for(; i < 1; i++){
+        cstr_arr_add(&reg.channel, topic, strlen(topic), i);
+    }
+    void* handle = bus_client_init(NULL, 0, &reg);
+    size_t count = 0;
+    this_thread::sleep_for(chrono::seconds(3));
+    while (true) {
+        void* src = NULL;
+        auto msg = bus_client_get_reqmsg(handle, &src);
+        auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12);
+        bus_client_reply_msg(handle, src, repmsg);
+        free_reply_msg(repmsg);
+        printf("REPREQ msg [%s] \n", msg->msg.str);
+
+        free_reqmsg(msg);
+        this_thread::sleep_for(chrono::seconds{2});
+    }
+}
+
+int main(int argc, char const *argv[])
+{
+    vector<string> topics{
+        "cbhomeclient_test_pubsub"
+    };
+    thread([&]{ pub(topics); }).detach();
+    thread([&]{ sub(topics); }).detach();
+    // sub(topics);
+
+    printf("start RR\n");
+    const char* rrtopic = "cbhomeclient_req_rep";
+    thread([&]{ req(rrtopic); }).detach();
+    reply(rrtopic);
+
     return 0;
 }
\ No newline at end of file
diff --git a/message.cpp b/message.cpp
index 591b50a..394e1d4 100644
--- a/message.cpp
+++ b/message.cpp
@@ -25,7 +25,13 @@
 static struct cstr cstr_clone(const struct cstr old){
     return cstr_new(old.str, old.size);
 }
-
+static struct cstr cstr_ref(char* str, const size_t len){
+    struct cstr cs;
+    memset(&cs, 0, sizeof(cs));
+    cs.size = len;
+    cs.str = str;
+    return cs;
+}
 struct cstr cstr_new(const char* str, const size_t len){
     struct cstr cs;
     memset(&cs, 0, sizeof(cs));
@@ -37,20 +43,20 @@
 void cstr_free(struct cstr str){
     if (str.str && str.size) free(str.str);
 }
-struct cstrarr cstr_arr_new(const size_t len){
+struct cstrarr cstr_arr_new(const size_t count){
     struct cstrarr arr;
     memset(&arr, 0, sizeof(arr));
-    arr.arr = (struct cstr*)calloc(len, sizeof(struct cstr));
-    arr.size = len;
+    arr.arr = (struct cstr*)calloc(count, sizeof(struct cstr));
+    arr.count = count;
     return arr;
 }
 void cstr_arr_add(struct cstrarr* arr, const char* data, const size_t len, const size_t idx){
-    if (arr->arr && arr->size && idx < arr->size){
+    if (arr->arr && arr->count && idx < arr->count){
         arr->arr[idx] = cstr_new(data, len);
     }
 }
 void cstr_arr_free(struct cstrarr arr){
-    for(size_t i = 0; i < arr.size; i++){
+    for(size_t i = 0; i < arr.count; i++){
         auto & str = arr.arr[i];
         cstr_free(str);
     }
@@ -176,11 +182,11 @@
     rinfo->pinfo = pinfo;
 
     auto assign_arr = [&obj](yyjson_val* v){
-        const size_t size = yyjson_arr_size(v);
+        const size_t count = yyjson_arr_size(v);
         struct cstrarr arr;
         memset(&arr, 0, sizeof(arr));
-        arr.size = size;
-        for(size_t i = 0; i < size; i++){
+        arr.count = count;
+        for(size_t i = 0; i < count; i++){
             auto sv = yyjson_arr_get(v, i);
             char* entry = NULL;
             size_t entry_size = 0;
@@ -357,49 +363,60 @@
 }
 
 //////////////////////////////////////////////////
-struct creq json2creq(const char* data, const size_t size){
-    yyjson_doc* doc = yyjson_read(data, size, 0);
+static tuple<struct cstr, struct cstr> json2reqmsg(const struct cstr msg){
+    yyjson_doc* doc = yyjson_read(msg.str, msg.size, 0);
     yyjson_val* root = yyjson_doc_get_root(doc);
 
     auto obj = [](yyjson_val* v, const char* name){return yyjson_obj_get(v, name);};
 
-    struct creq req;
-    memset(&req, 0, sizeof(req));
-
     auto jp = obj(root, "path");
     auto jb = obj(root, "body");
-    req.path = cstr_new(yyjson_get_str(jp), yyjson_get_len(jp));
-    req.body = cstr_new(yyjson_get_raw(jb), yyjson_get_len(jb));
-
+    auto tp = make_tuple(cstr_new(yyjson_get_str(jp), yyjson_get_len(jp)),
+                            cstr_new(yyjson_get_raw(jb), yyjson_get_len(jb)));
     yyjson_doc_free(doc);
-    return req;
+    return tp;
 }
 
-struct creqmsg* to_reqmsg(const char* pid, const size_t pids,
-                            const char* data, const size_t size, void* src)
+struct creqmsg* to_reqmsg(const char* pid, const size_t pids, const char* data, const size_t size)
 {
     auto msg = ptrT<struct creqmsg>();
     msg->procid = cstr_new(pid, pids);
     MsgRequestTopic msgRT;
     if (!msgRT.ParseFromArray(data, size)) return NULL;
+    msg->msg = cstr_new(msgRT.data().data(), msgRT.data().size());
 
-    // creg = msgRT.data();
-    msg->msg = json2creq(data, size);
-    msg->src = src;
+    return msg;
+}
+
+struct creqmsg* make_req_msg(const char* topic, const size_t topics,
+                                const char* data, const size_t datal)
+{
+    auto msg = ptrT<struct creqmsg>();
+    MsgRequestTopic msgRT;
+    msgRT.set_topic(topic, topics);
+    msgRT.set_data(data, datal);
+
+    auto pbstr = msgRT.SerializeAsString();
+    msg->msg = cstr_new(pbstr.data(), pbstr.size());
+
     return msg;
 }
 
 void free_reqmsg(struct creqmsg* msg){
     if (msg){
         cstr_free(msg->procid);
-        cstr_free(msg->msg.path);
-        cstr_free(msg->msg.body);
+        cstr_free(msg->msg);
         free(msg);
     }
 }
 
 struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg){
-    auto &body = msg->msg.body;
+    struct cstr path, body;
+    memset(&path, 0, sizeof(path));
+    memset(&body, 0, sizeof(body));
+    tie(path, body) = json2reqmsg(msg->msg);
+    if (body.size == 0) return NULL;
+
     auto doc = yyjson_read(body.str, body.size, 0);
     auto root = yyjson_doc_get_root(doc);
 
@@ -411,6 +428,10 @@
     stkmsg->stackid = cstr_new(yyjson_get_str(jsid), yyjson_get_len(jsid));
     stkmsg->fileid = cstr_new(yyjson_get_str(jfid), yyjson_get_len(jfid));
     yyjson_doc_free(doc);
+
+    cstr_free(path);
+    cstr_free(body);
+
     return stkmsg;
 }
 void free_reqmsg_stackerr(struct cstackmsgerr* msg){
@@ -422,7 +443,12 @@
 }
 // decode success msg
 struct cstackmsg* get_reqmsg_stack(struct creqmsg* msg){
-    auto &body = msg->msg.body;
+    struct cstr path, body;
+    memset(&path, 0, sizeof(path));
+    memset(&body, 0, sizeof(body));
+    tie(path, body) = json2reqmsg(msg->msg);
+    if (body.size == 0) return NULL;
+
     auto doc = yyjson_read(body.str, body.size, 0);
     auto root = yyjson_doc_get_root(doc);
 
@@ -471,22 +497,34 @@
     }
 }
 
-struct crepmsg* make_reply_msg(const int success, const char* msg, const size_t msgl,
+struct crepmsg* make_reply_msg(const int errcode, const char* errmsg, const size_t emsgl,
                                 const char* data, const size_t datal)
 {
-    auto rmsg = ptrT<struct crepmsg>();
-    // rmsg->success = success;
-    // rmsg->msg = cstr_new(msg, msgl);
-    // rmsg->data = cstr_new(msg, msgl);
+    auto msg = ptrT<struct crepmsg>();
+    msg->errcode = errcode;
+    msg->errmsg = cstr_new(errmsg, emsgl);
+    msg->data = cstr_new(data, datal);
+    return msg;
+}
 
+struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl,
+                                const char* data, const size_t datal)
+{
     auto doc = yyjson_mut_doc_new(NULL);
+    auto root = yyjson_mut_obj(doc);
+    yyjson_mut_obj_add_bool(doc, root, "success", !!success);
+    yyjson_mut_obj_add_strn(doc, root, "msg", msg, msgl);
+    yyjson_mut_obj_add_strn(doc, root, "data", data, datal);
 
-    return rmsg;
+    size_t jsonl = 0;
+    char* json = yyjson_mut_val_write(root, 0, &jsonl);
+    yyjson_mut_doc_free(doc);
+    return cstr_ref(json, jsonl);
 }
 
 void free_reply_msg(struct crepmsg* msg){
     if (msg){
-        cstr_free(msg->msg);
+        cstr_free(msg->errmsg);
         cstr_free(msg->data);
         free(msg);
     }
diff --git a/message.h b/message.h
index dc289df..6024a7b 100644
--- a/message.h
+++ b/message.h
@@ -16,7 +16,7 @@
 
 struct cstrarr{
     struct cstr* arr;
-    size_t size;
+    size_t count;
 };
 // 杩涚▼娉ㄥ唽淇℃伅
 struct creg{
@@ -88,16 +88,10 @@
 };
 
 //////////////////////////////////////////
-// request body
-struct creq{
-    struct cstr path;
-    struct cstr body;
-};
 // request msg
 struct creqmsg{
     struct cstr procid;
-    struct creq msg;
-    void* src;
+    struct cstr msg;
 };
 // decode stack err msg
 struct cstackmsgerr{
@@ -128,10 +122,8 @@
 
 // reply msg
 struct crepmsg{
-    struct cstr json;
-
-    int success;
-    struct cstr msg;
+    int errcode;
+    struct cstr errmsg;
     struct cstr data;
 };
 
@@ -141,7 +133,7 @@
 
 struct cstr cstr_new(const char* str, const size_t len);
 void cstr_free(struct cstr str);
-struct cstrarr cstr_arr_new(const size_t len);
+struct cstrarr cstr_arr_new(const size_t count);
 void cstr_arr_add(struct cstrarr* arr, const char* data, const size_t len, const size_t idx);
 void cstr_arr_free(struct cstrarr arr);
 
@@ -161,8 +153,10 @@
 void free_submsg_proclist(struct cproclist* ppl);
 
 // request msg
-struct creqmsg* to_reqmsg(const char* pid, const size_t pids,
-                            const char* data, const size_t size, void* src);
+struct creqmsg* to_reqmsg(const char* pid,const size_t pids,const char* data,const size_t size);
+struct creqmsg* make_req_msg(const char* topic, const size_t topics,
+                                const char* data, const size_t datal);
+
 void free_reqmsg(struct creqmsg* msg);
 // decode err msg
 struct cstackmsgerr* get_reqmsg_stackerr(struct creqmsg* msg);
@@ -172,8 +166,11 @@
 void free_reqmsg_stack(struct cstackmsg* msg);
 
 // reply msg
-struct crepmsg* make_reply_msg(const int success, const char* msg, const size_t msgl,
+struct cstr make_reply_msg_json(const int success, const char* msg, const size_t msgl,
                                 const char* data, const size_t datal);
+struct crepmsg* make_reply_msg(const int errcode, const char* errmsg, const size_t emsgl,
+                                const char* data, const size_t datal);
+
 void free_reply_msg(struct crepmsg* msg);
 
 #ifdef __cplusplus

--
Gitblit v1.8.0