From d8cb16e3a0d8d1483f89ca106b1e31fc7663b31d Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期日, 29 一月 2023 09:31:15 +0800
Subject: [PATCH] bug fixed query_proc
---
main.cpp | 111 +++++++++++++++++++++++++------------------------------
1 files changed, 51 insertions(+), 60 deletions(-)
diff --git a/main.cpp b/main.cpp
index 84578e5..bcc2f7c 100644
--- a/main.cpp
+++ b/main.cpp
@@ -9,37 +9,24 @@
#include "cbhomeclient.h"
#include "message.h"
-// #include "3dparty/bus_nng/bn_api.h"
+// #include "3rdparty/bus_nng/bn_api.h"
#include "bhome_msg_api.pb.h"
using namespace bhome_msg;
-
-static cproc* make_proc(const char* name, const char* id){
- cproc* pinfo = (cproc*)calloc(1,sizeof(cproc));
- auto assign = [](char** d, size_t* l, const char* tmp){
- *l = strlen(tmp);
- *d = (char*)malloc(*l);
- memcpy(*d, tmp, *l);
- };
- assign(&pinfo->name.str, &pinfo->name.size, name);
- assign(&pinfo->id.str, &pinfo->id.size, id);
-
- return pinfo;
-}
template <class F> void ignoref(F&& f){}
static void pub(const vector<string>& topics){
ignoref(pub);
- creg reg;
- memset(®, 0, sizeof(reg));
- reg.pinfo = make_proc("pub", "pubid");
- reg.topic_pub = cstr_arr_new(topics.size());
- size_t i = 0;
- for(; i < topics.size(); i++){
- cstr_arr_add(®.topic_pub, topics.at(0).data(), topics.at(0).size(), i);
- }
- void* handle = bus_client_init(NULL, 0, ®);
+ vector<const char*> tpc;
+ for(auto& t : topics) tpc.push_back(t.c_str());
+
+ creg* reg = make_creg(make_cproc("pub", "pubid"),
+ NULL, 0, &tpc[0], tpc.size(), NULL, 0, NULL, 0);
+
+ void* handle = bus_client_init(NULL, 0, reg);
+ creg_free(reg);
+
size_t count = 0;
string base_msg("test_pub_sub==");
this_thread::sleep_for(chrono::seconds(3));
@@ -54,7 +41,7 @@
int ret = bus_client_publish(handle, i.data(), i.size(), msg.data(), msg.size());
printf("======>> bus_client_pubmsg [%s] ret %d\n", msg.c_str(), ret);
- this_thread::sleep_for(chrono::seconds{2});
+ this_thread::sleep_for(chrono::seconds{1});
}
}
}
@@ -62,21 +49,24 @@
static void sub(const vector<string>& topics){
ignoref(sub);
- creg reg;
- memset(®, 0, sizeof(reg));
- reg.pinfo = make_proc("sub", "subid");
+ vector<const char*> tpc;
+ for(auto& t : topics) tpc.push_back(t.c_str());
- reg.topic_sub = cstr_arr_new(topics.size());
- size_t i = 0;
- for(; i < topics.size(); i++){
- cstr_arr_add(®.topic_sub, topics.at(0).data(), topics.at(0).size(), i);
- }
+ creg* reg = make_creg(make_cproc("sub", "subid"),
+ NULL, 0, NULL, 0, &tpc[0], tpc.size(), NULL, 0);
- void* handle = bus_client_init(NULL, 0, ®);
+ void* handle = bus_client_init(NULL, 0, reg);
+ creg_free(reg);
while (true) {
auto msg = bus_client_get_submsg(handle);
- printf("SUB msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str);
+ printf("SUB msg topic [%s] data [%s]\n", msg->topic, msg->msg);
+ free_submsg(msg);
+
+ size_t count = 0;
+ auto p = bus_client_query_procs(handle, &count);
+ printf("bus_client_query_procs size %lu\n", count);
+ free_query_procs(p, count);
}
bus_client_free(handle);
@@ -85,25 +75,21 @@
static void req(const char* topic){
ignoref(req);
- string strtpc(topic);
- creg reg;
- memset(®, 0, sizeof(reg));
- reg.pinfo = make_proc("request", "requestid");
- // reg.channel = cstr_arr_new(1);
- // size_t i = 0;
- // for(; i < 1; i++){
- // cstr_arr_add(®.topic_pub, topic, strlen(topic), i);
- // }
- void* handle = bus_client_init(NULL, 0, ®);
+ const auto topicl = strlen(topic);
+ creg* reg = make_creg_from_cproc(make_cproc("request", "requestid"));
+
+ void* handle = bus_client_init(NULL, 0, reg);
+ creg_free(reg);
+
size_t count = 0;
- string base_msg("test_request==");
+ string base_msg("test_request==request message -> msg-");
this_thread::sleep_for(chrono::seconds(3));
while (true) {
- auto msg = base_msg + "request message -> msg-"+to_string(count++);
- auto reqmsg = make_req_msg(strtpc.data(), strtpc.size(), msg.data(), msg.size());
+ auto msg = base_msg + to_string(count++);
+ auto reqmsg = make_req_msg(topic, topicl, msg.data(), msg.size());
crepmsg* repmsg = NULL;
if (bus_client_request(handle, reqmsg, &repmsg)){
- printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data.str);
+ printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data);
}
free_reqmsg(reqmsg);
free_reply_msg(repmsg);
@@ -114,27 +100,31 @@
static void reply(const char* topic){
ignoref(reply);
- creg reg;
- memset(®, 0, sizeof(reg));
- reg.pinfo = make_proc("reply", "replyid");
- reg.channel = cstr_arr_new(1);
- cstr_arr_add(®.channel, topic, strlen(topic), 0);
- reg.topic_pub = cstr_arr_new(1);
- cstr_arr_add(®.topic_pub, topic, strlen(topic), 0);
+ const auto topicl = strlen(topic);
+ vector<const char*> tpc{topic};
- void* handle = bus_client_init(NULL, 0, ®);
+ // creg* reg = make_creg(make_cproc("reply", "replyid"),
+ // &tpc[0], tpc.size(), &tpc[0], tpc.size(), NULL, 0, NULL, 0);
+
+ creg* reg = make_creg_from_cproc(make_cproc("reply", "replyid"));
+ creg_add_topic_reply(reg, tpc.data(), tpc.size());
+ creg_add_topic_pub(reg, tpc.data(), tpc.size());
+
+ void* handle = bus_client_init(NULL, 0, reg);
+ creg_free(reg);
+
size_t count = 0;
this_thread::sleep_for(chrono::seconds(3));
while (true) {
void* src = NULL;
auto msg = bus_client_get_reqmsg(handle, &src);
auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12);
- bus_client_reply_msg(handle, src, repmsg);
+ bus_client_reply(handle, src, repmsg);
free_reply_msg(repmsg);
- printf("REPREQ msg [%s] \n", msg->msg.str);
+ printf("REPREQ msg [%s] \n", msg->msg);
free_reqmsg(msg);
- this_thread::sleep_for(chrono::seconds{2});
+ // this_thread::sleep_for(chrono::seconds{2});
}
}
@@ -150,7 +140,8 @@
printf("start RR\n");
const char* rrtopic = "cbhomeclient_req_rep";
thread([&]{ req(rrtopic); }).detach();
+
reply(rrtopic);
return 0;
-}
\ No newline at end of file
+}
--
Gitblit v1.8.0