#include #include #include #include #include using namespace std; #include "cbhomeclient.h" #include "message.h" // #include "3dparty/bus_nng/bn_api.h" #include "bhome_msg_api.pb.h" using namespace bhome_msg; static cproc* make_proc(const char* name, const char* id){ cproc* pinfo = (cproc*)calloc(1,sizeof(cproc)); auto assign = [](char** d, size_t* l, const char* tmp){ *l = strlen(tmp); *d = (char*)malloc(*l); memcpy(*d, tmp, *l); }; assign(&pinfo->name.str, &pinfo->name.size, name); assign(&pinfo->id.str, &pinfo->id.size, id); return pinfo; } template void ignoref(F&& f){} static void pub(const vector& topics){ ignoref(pub); creg reg; memset(®, 0, sizeof(reg)); reg.pinfo = make_proc("pub", "pubid"); reg.topic_pub = cstr_arr_new(topics.size()); size_t i = 0; for(; i < topics.size(); i++){ cstr_arr_add(®.topic_pub, topics.at(0).data(), topics.at(0).size(), i); } void* handle = bus_client_init(NULL, 0, ®); size_t count = 0; string base_msg("test_pub_sub=="); this_thread::sleep_for(chrono::seconds(3)); while (true) { for(auto && i : topics){ auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++); // MsgPublish pbmsg; // pbmsg.set_topic(i); // pbmsg.set_data(msg); // auto data = pbmsg.SerializeAsString(); // int ret = bus_client_pubmsg(handle, data.data(), data.size()); int ret = bus_client_publish(handle, i.data(), i.size(), msg.data(), msg.size()); printf("======>> bus_client_pubmsg [%s] ret %d\n", msg.c_str(), ret); this_thread::sleep_for(chrono::seconds{2}); } } } static void sub(const vector& topics){ ignoref(sub); creg reg; memset(®, 0, sizeof(reg)); reg.pinfo = make_proc("sub", "subid"); reg.topic_sub = cstr_arr_new(topics.size()); size_t i = 0; for(; i < topics.size(); i++){ cstr_arr_add(®.topic_sub, topics.at(0).data(), topics.at(0).size(), i); } void* handle = bus_client_init(NULL, 0, ®); while (true) { auto msg = bus_client_get_submsg(handle); printf("SUB msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str); } bus_client_free(handle); } static void req(const char* topic){ ignoref(req); string strtpc(topic); creg reg; memset(®, 0, sizeof(reg)); reg.pinfo = make_proc("request", "requestid"); // reg.channel = cstr_arr_new(1); // size_t i = 0; // for(; i < 1; i++){ // cstr_arr_add(®.topic_pub, topic, strlen(topic), i); // } void* handle = bus_client_init(NULL, 0, ®); size_t count = 0; string base_msg("test_request=="); this_thread::sleep_for(chrono::seconds(3)); while (true) { auto msg = base_msg + "request message -> msg-"+to_string(count++); auto reqmsg = make_req_msg(strtpc.data(), strtpc.size(), msg.data(), msg.size()); crepmsg* repmsg = NULL; if (bus_client_request(handle, reqmsg, &repmsg)){ printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data.str); } free_reqmsg(reqmsg); free_reply_msg(repmsg); this_thread::sleep_for(chrono::seconds{2}); } } static void reply(const char* topic){ ignoref(reply); creg reg; memset(®, 0, sizeof(reg)); reg.pinfo = make_proc("reply", "replyid"); reg.channel = cstr_arr_new(1); cstr_arr_add(®.channel, topic, strlen(topic), 0); reg.topic_pub = cstr_arr_new(1); cstr_arr_add(®.topic_pub, topic, strlen(topic), 0); void* handle = bus_client_init(NULL, 0, ®); size_t count = 0; this_thread::sleep_for(chrono::seconds(3)); while (true) { void* src = NULL; auto msg = bus_client_get_reqmsg(handle, &src); auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12); bus_client_reply_msg(handle, src, repmsg); free_reply_msg(repmsg); printf("REPREQ msg [%s] \n", msg->msg.str); free_reqmsg(msg); this_thread::sleep_for(chrono::seconds{2}); } } int main(int argc, char const *argv[]) { vector topics{ "cbhomeclient_test_pubsub" }; thread([&]{ pub(topics); }).detach(); thread([&]{ sub(topics); }).detach(); // sub(topics); printf("start RR\n"); const char* rrtopic = "cbhomeclient_req_rep"; thread([&]{ req(rrtopic); }).detach(); reply(rrtopic); return 0; }