#include <stdio.h>
|
|
#include <vector>
|
#include <string>
|
#include <thread>
|
#include <memory>
|
using namespace std;
|
|
#include "cbhomeclient.h"
|
#include "message.h"
|
|
// #include "3dparty/bus_nng/bn_api.h"
|
#include "bhome_msg_api.pb.h"
|
using namespace bhome_msg;
|
|
static cproc* make_proc(const char* name, const char* id){
|
cproc* pinfo = (cproc*)calloc(1,sizeof(cproc));
|
auto assign = [](char** d, size_t* l, const char* tmp){
|
*l = strlen(tmp);
|
*d = (char*)malloc(*l);
|
memcpy(*d, tmp, *l);
|
};
|
assign(&pinfo->name.str, &pinfo->name.size, name);
|
assign(&pinfo->id.str, &pinfo->id.size, id);
|
|
return pinfo;
|
}
|
|
template <class F> void ignoref(F&& f){}
|
|
static void pub(const vector<string>& topics){
|
ignoref(pub);
|
|
creg reg;
|
memset(®, 0, sizeof(reg));
|
reg.pinfo = make_proc("pub", "pubid");
|
reg.topic_pub = cstr_arr_new(topics.size());
|
size_t i = 0;
|
for(; i < topics.size(); i++){
|
cstr_arr_add(®.topic_pub, topics.at(0).data(), topics.at(0).size(), i);
|
}
|
void* handle = bus_client_init(NULL, 0, ®);
|
size_t count = 0;
|
string base_msg("test_pub_sub==");
|
this_thread::sleep_for(chrono::seconds(3));
|
while (true) {
|
for(auto && i : topics){
|
auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
|
MsgPublish pbmsg;
|
pbmsg.set_topic(i);
|
pbmsg.set_data(msg);
|
auto data = pbmsg.SerializeAsString();
|
// TestPub(i.c_str(), i.length(), data.c_str(), data.length());
|
int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size());
|
printf("======>> bus_client_pubmsg [%s]\n", msg.c_str());
|
this_thread::sleep_for(chrono::seconds{2});
|
}
|
}
|
}
|
|
static void sub(const vector<string>& topics){
|
ignoref(sub);
|
|
creg reg;
|
memset(®, 0, sizeof(reg));
|
reg.pinfo = make_proc("sub", "subid");
|
|
reg.topic_sub = cstr_arr_new(topics.size());
|
size_t i = 0;
|
for(; i < topics.size(); i++){
|
cstr_arr_add(®.topic_sub, topics.at(0).data(), topics.at(0).size(), i);
|
}
|
|
void* handle = bus_client_init(NULL, 0, ®);
|
|
while (true) {
|
auto msg = bus_client_get_submsg(handle);
|
printf("SUB msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str);
|
}
|
|
bus_client_free(handle);
|
}
|
|
static void req(const char* topic){
|
ignoref(req);
|
|
string strtpc(topic);
|
creg reg;
|
memset(®, 0, sizeof(reg));
|
reg.pinfo = make_proc("request", "requestid");
|
// reg.channel = cstr_arr_new(1);
|
// size_t i = 0;
|
// for(; i < 1; i++){
|
// cstr_arr_add(®.topic_pub, topic, strlen(topic), i);
|
// }
|
void* handle = bus_client_init(NULL, 0, ®);
|
size_t count = 0;
|
string base_msg("test_request==");
|
this_thread::sleep_for(chrono::seconds(3));
|
while (true) {
|
auto msg = base_msg + "request message -> msg-"+to_string(count++);
|
auto reqmsg = make_req_msg(strtpc.data(), strtpc.size(), msg.data(), msg.size());
|
crepmsg* repmsg = NULL;
|
if (bus_client_request(handle, reqmsg, &repmsg)){
|
printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data.str);
|
}
|
free_reqmsg(reqmsg);
|
free_reply_msg(repmsg);
|
this_thread::sleep_for(chrono::seconds{2});
|
}
|
}
|
|
static void reply(const char* topic){
|
ignoref(reply);
|
|
creg reg;
|
memset(®, 0, sizeof(reg));
|
reg.pinfo = make_proc("reply", "replyid");
|
reg.channel = cstr_arr_new(1);
|
size_t i = 0;
|
for(; i < 1; i++){
|
cstr_arr_add(®.channel, topic, strlen(topic), i);
|
}
|
void* handle = bus_client_init(NULL, 0, ®);
|
size_t count = 0;
|
this_thread::sleep_for(chrono::seconds(3));
|
while (true) {
|
void* src = NULL;
|
auto msg = bus_client_get_reqmsg(handle, &src);
|
auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12);
|
bus_client_reply_msg(handle, src, repmsg);
|
free_reply_msg(repmsg);
|
printf("REPREQ msg [%s] \n", msg->msg.str);
|
|
free_reqmsg(msg);
|
this_thread::sleep_for(chrono::seconds{2});
|
}
|
}
|
|
int main(int argc, char const *argv[])
|
{
|
vector<string> topics{
|
"cbhomeclient_test_pubsub"
|
};
|
thread([&]{ pub(topics); }).detach();
|
thread([&]{ sub(topics); }).detach();
|
// sub(topics);
|
|
printf("start RR\n");
|
const char* rrtopic = "cbhomeclient_req_rep";
|
thread([&]{ req(rrtopic); }).detach();
|
reply(rrtopic);
|
|
return 0;
|
}
|