#include <stdio.h>
|
|
#include <vector>
|
#include <string>
|
#include <thread>
|
#include <memory>
|
using namespace std;
|
|
#include "cbhomeclient.h"
|
#include "message.h"
|
|
// #include "3rdparty/bus_nng/bn_api.h"
|
#include "bhome_msg_api.pb.h"
|
using namespace bhome_msg;
|
|
template <class F> void ignoref(F&& f){}
|
|
static void pub(const vector<string>& topics){
|
ignoref(pub);
|
|
vector<const char*> tpc;
|
for(auto& t : topics) tpc.push_back(t.c_str());
|
|
creg* reg = make_creg(make_cproc("pub", "pubid"),
|
NULL, 0, &tpc[0], tpc.size(), NULL, 0, NULL, 0);
|
|
void* handle = bus_client_init(NULL, 0, reg);
|
creg_free(reg);
|
|
size_t count = 0;
|
string base_msg("test_pub_sub==");
|
this_thread::sleep_for(chrono::seconds(3));
|
while (true) {
|
for(auto && i : topics){
|
auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
|
// MsgPublish pbmsg;
|
// pbmsg.set_topic(i);
|
// pbmsg.set_data(msg);
|
// auto data = pbmsg.SerializeAsString();
|
// int ret = bus_client_pubmsg(handle, data.data(), data.size());
|
|
int ret = bus_client_publish(handle, i.data(), i.size(), msg.data(), msg.size());
|
printf("======>> bus_client_pubmsg [%s] ret %d\n", msg.c_str(), ret);
|
this_thread::sleep_for(chrono::seconds{1});
|
}
|
}
|
}
|
|
static void sub(const vector<string>& topics){
|
ignoref(sub);
|
|
vector<const char*> tpc;
|
for(auto& t : topics) tpc.push_back(t.c_str());
|
|
creg* reg = make_creg(make_cproc("sub", "subid"),
|
NULL, 0, NULL, 0, &tpc[0], tpc.size(), NULL, 0);
|
|
void* handle = bus_client_init(NULL, 0, reg);
|
creg_free(reg);
|
|
while (true) {
|
auto msg = bus_client_get_submsg(handle);
|
printf("SUB msg topic [%s] data [%s]\n", msg->topic, msg->msg);
|
free_submsg(msg);
|
}
|
|
bus_client_free(handle);
|
}
|
|
static void req(const char* topic){
|
ignoref(req);
|
|
const auto topicl = strlen(topic);
|
creg* reg = make_creg_from_cproc(make_cproc("request", "requestid"));
|
|
void* handle = bus_client_init(NULL, 0, reg);
|
creg_free(reg);
|
|
size_t count = 0;
|
string base_msg("test_request==request message -> msg-");
|
this_thread::sleep_for(chrono::seconds(3));
|
while (true) {
|
auto msg = base_msg + to_string(count++);
|
auto reqmsg = make_req_msg(topic, topicl, msg.data(), msg.size());
|
crepmsg* repmsg = NULL;
|
if (bus_client_request(handle, reqmsg, &repmsg)){
|
printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data);
|
}
|
free_reqmsg(reqmsg);
|
free_reply_msg(repmsg);
|
this_thread::sleep_for(chrono::seconds{2});
|
}
|
}
|
|
static void reply(const char* topic){
|
ignoref(reply);
|
|
const auto topicl = strlen(topic);
|
vector<const char*> tpc{topic};
|
|
// creg* reg = make_creg(make_cproc("reply", "replyid"),
|
// &tpc[0], tpc.size(), &tpc[0], tpc.size(), NULL, 0, NULL, 0);
|
|
creg* reg = make_creg_from_cproc(make_cproc("reply", "replyid"));
|
creg_add_topic_reply(reg, tpc.data(), tpc.size());
|
creg_add_topic_pub(reg, tpc.data(), tpc.size());
|
|
void* handle = bus_client_init(NULL, 0, reg);
|
creg_free(reg);
|
|
size_t count = 0;
|
this_thread::sleep_for(chrono::seconds(3));
|
while (true) {
|
void* src = NULL;
|
auto msg = bus_client_get_reqmsg(handle, &src);
|
auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12);
|
bus_client_reply(handle, src, repmsg);
|
free_reply_msg(repmsg);
|
printf("REPREQ msg [%s] \n", msg->msg);
|
|
free_reqmsg(msg);
|
// this_thread::sleep_for(chrono::seconds{2});
|
}
|
}
|
|
int main(int argc, char const *argv[])
|
{
|
vector<string> topics{
|
"cbhomeclient_test_pubsub"
|
};
|
thread([&]{ pub(topics); }).detach();
|
thread([&]{ sub(topics); }).detach();
|
// sub(topics);
|
|
printf("start RR\n");
|
const char* rrtopic = "cbhomeclient_req_rep";
|
thread([&]{ req(rrtopic); }).detach();
|
reply(rrtopic);
|
|
return 0;
|
}
|