From ae17d1439b35b55212c3a30712e0a60b1d6a99c0 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 30 六月 2021 11:15:53 +0800
Subject: [PATCH] support tcp pub/sub.

---
 utest/api_test.cpp |   91 +++++++++++++++++++++++++++++++--------------
 1 files changed, 62 insertions(+), 29 deletions(-)

diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 239ea8b..13a552d 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -16,6 +16,7 @@
  * =====================================================================================
  */
 #include "bh_api.h"
+#include "json.h"
 #include "robust.h"
 #include "util.h"
 #include <atomic>
@@ -330,17 +331,27 @@
 	}
 
 	{ // Subscribe
-		MsgTopicList topics;
-		topics.add_topic_list("#center.node");
-		for (int i = 0; i < 10; ++i) {
-			topics.add_topic_list(topic_ + std::to_string(i * 2));
-		}
-		std::string s = topics.SerializeAsString();
-		void *reply = 0;
-		int reply_len = 0;
-		bool r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000);
-		BHFree(reply, reply_len);
-		printf("subscribe topic : %s\n", r ? "ok" : "failed");
+		auto Subscribe = [&](std::string topic, bool net) {
+			MsgTopicList topics;
+			topics.add_topic_list(topic);
+			for (int i = 0; i < 10; ++i) {
+				topics.add_topic_list(topic_ + std::to_string(i * 2));
+			}
+			std::string s = topics.SerializeAsString();
+			void *reply = 0;
+			int reply_len = 0;
+			bool r = false;
+			if (net) {
+				r = BHSubscribeNetTopics(s.data(), s.size(), &reply, &reply_len, 1000);
+			} else {
+				r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000);
+			}
+			BHFree(reply, reply_len);
+			printf("subscribe topic %s: %s\n", topic.c_str(), (r ? "ok" : "failed"));
+		};
+		Subscribe("#center.node", false);
+		Subscribe("local0", false);
+		Subscribe("net0", true);
 	}
 
 	auto ServerLoop = [&](std::atomic<bool> *run) {
@@ -368,14 +379,47 @@
 		}
 	};
 
+	std::atomic<bool> run(true);
+	ThreadManager threads;
+#if 1
+	BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
+#else
+	BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc);
+	threads.Launch(ServerLoop, &run);
+#endif
+
+	auto Publish = [&](const std::string &topic, const std::string &data) {
+		MsgPublish pub;
+		pub.set_topic(topic);
+		pub.set_data(data);
+		std::string s(pub.SerializeAsString());
+		BHPublish(s.data(), s.size(), 0);
+	};
+
 	{
+		// publish
+		Publish(topic_ + std::to_string(0), "pub_data_" + std::string(104 * 1, 'a'));
 		for (int i = 0; i < 1; ++i) {
-			MsgPublish pub;
-			pub.set_topic(topic_ + std::to_string(i));
-			pub.set_data("pub_data_" + std::string(104 * 1, 'a'));
-			std::string s(pub.SerializeAsString());
-			BHPublish(s.data(), s.size(), 0);
-			// Sleep(1s);
+
+			ssjson::Json net = ssjson::Json::Array();
+			ssjson::Json host;
+			host.put("serverId", "test_host");
+			host.put("ip", "127.0.0.1");
+			ssjson::Json topics = ssjson::Json::Array();
+			topics.push_back("aaaaa");
+			topics.push_back("bbbbb");
+			host.put("pubTopics", topics);
+			topics.push_back("net0");
+			topics.push_back("net1");
+			host.put("netSubTopics", topics);
+			net.push_back(host);
+
+			Publish("pub-allRegisterInfo-to-center", net.dump());
+			Sleep(1s);
+			Publish("local0", "local-abcd0");
+			Publish("net0", "net-abcd0");
+			Publish("local0", "local-abcd1");
+			Sleep(1s);
 		}
 	}
 
@@ -428,22 +472,11 @@
 		}
 	};
 
-	std::atomic<bool> run(true);
-
-	ThreadManager threads;
-
-#if 1
-	BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
-#else
-	BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc);
-	threads.Launch(ServerLoop, &run);
-#endif
-
 	boost::timer::auto_cpu_timer timer;
 	threads.Launch(hb, &run);
 	threads.Launch(showStatus, &run);
 	int ncli = 10;
-	const int64_t nreq = 1000 * 100;
+	const int64_t nreq = 1000; //* 100;
 
 	for (int i = 0; i < 10; ++i) {
 		SyncRequest(topic_ + std::to_string(0), "request_data_" + std::to_string(i));

--
Gitblit v1.8.0