From ae17d1439b35b55212c3a30712e0a60b1d6a99c0 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 30 六月 2021 11:15:53 +0800 Subject: [PATCH] support tcp pub/sub. --- utest/api_test.cpp | 91 +++++++++++++++++++++++++++++++-------------- 1 files changed, 62 insertions(+), 29 deletions(-) diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 239ea8b..13a552d 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -16,6 +16,7 @@ * ===================================================================================== */ #include "bh_api.h" +#include "json.h" #include "robust.h" #include "util.h" #include <atomic> @@ -330,17 +331,27 @@ } { // Subscribe - MsgTopicList topics; - topics.add_topic_list("#center.node"); - for (int i = 0; i < 10; ++i) { - topics.add_topic_list(topic_ + std::to_string(i * 2)); - } - std::string s = topics.SerializeAsString(); - void *reply = 0; - int reply_len = 0; - bool r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000); - BHFree(reply, reply_len); - printf("subscribe topic : %s\n", r ? "ok" : "failed"); + auto Subscribe = [&](std::string topic, bool net) { + MsgTopicList topics; + topics.add_topic_list(topic); + for (int i = 0; i < 10; ++i) { + topics.add_topic_list(topic_ + std::to_string(i * 2)); + } + std::string s = topics.SerializeAsString(); + void *reply = 0; + int reply_len = 0; + bool r = false; + if (net) { + r = BHSubscribeNetTopics(s.data(), s.size(), &reply, &reply_len, 1000); + } else { + r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000); + } + BHFree(reply, reply_len); + printf("subscribe topic %s: %s\n", topic.c_str(), (r ? "ok" : "failed")); + }; + Subscribe("#center.node", false); + Subscribe("local0", false); + Subscribe("net0", true); } auto ServerLoop = [&](std::atomic<bool> *run) { @@ -368,14 +379,47 @@ } }; + std::atomic<bool> run(true); + ThreadManager threads; +#if 1 + BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); +#else + BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc); + threads.Launch(ServerLoop, &run); +#endif + + auto Publish = [&](const std::string &topic, const std::string &data) { + MsgPublish pub; + pub.set_topic(topic); + pub.set_data(data); + std::string s(pub.SerializeAsString()); + BHPublish(s.data(), s.size(), 0); + }; + { + // publish + Publish(topic_ + std::to_string(0), "pub_data_" + std::string(104 * 1, 'a')); for (int i = 0; i < 1; ++i) { - MsgPublish pub; - pub.set_topic(topic_ + std::to_string(i)); - pub.set_data("pub_data_" + std::string(104 * 1, 'a')); - std::string s(pub.SerializeAsString()); - BHPublish(s.data(), s.size(), 0); - // Sleep(1s); + + ssjson::Json net = ssjson::Json::Array(); + ssjson::Json host; + host.put("serverId", "test_host"); + host.put("ip", "127.0.0.1"); + ssjson::Json topics = ssjson::Json::Array(); + topics.push_back("aaaaa"); + topics.push_back("bbbbb"); + host.put("pubTopics", topics); + topics.push_back("net0"); + topics.push_back("net1"); + host.put("netSubTopics", topics); + net.push_back(host); + + Publish("pub-allRegisterInfo-to-center", net.dump()); + Sleep(1s); + Publish("local0", "local-abcd0"); + Publish("net0", "net-abcd0"); + Publish("local0", "local-abcd1"); + Sleep(1s); } } @@ -428,22 +472,11 @@ } }; - std::atomic<bool> run(true); - - ThreadManager threads; - -#if 1 - BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); -#else - BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc); - threads.Launch(ServerLoop, &run); -#endif - boost::timer::auto_cpu_timer timer; threads.Launch(hb, &run); threads.Launch(showStatus, &run); int ncli = 10; - const int64_t nreq = 1000 * 100; + const int64_t nreq = 1000; //* 100; for (int i = 0; i < 10; ++i) { SyncRequest(topic_ + std::to_string(0), "request_data_" + std::to_string(i)); -- Gitblit v1.8.0