From 2197cf91e7a3bd5941327ba630a42946b88f069e Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 09 四月 2021 14:15:41 +0800
Subject: [PATCH] join pub/sub to node; refactor.

---
 utest/utest.cpp |   45 ++++++++++++++++-----------------------------
 1 files changed, 16 insertions(+), 29 deletions(-)

diff --git a/utest/utest.cpp b/utest/utest.cpp
index c925e22..f88eab9 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,8 +1,5 @@
 #include "center.h"
 #include "defs.h"
-#include "pubsub.h"
-#include "socket.h"
-#include "topic_node.h"
 #include "util.h"
 #include <atomic>
 #include <boost/uuid/uuid_generators.hpp>
@@ -92,8 +89,12 @@
 	const uint64_t nmsg = 100 * 2;
 	const int timeout = 1000;
 	auto Sub = [&](int id, const std::vector<std::string> &topics) {
-		SocketSubscribe client(shm);
-		bool r = client.Subscribe(sub_proc_id, topics, timeout);
+		DemoNode client("client_" + std::to_string(id), shm);
+
+		bool r = client.Subscribe(topics, timeout);
+		if (!r) {
+			printf("client subscribe failed.\n");
+		}
 		std::mutex mutex;
 		std::condition_variable cv;
 
@@ -112,18 +113,19 @@
 			}
 			// printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
 		};
-		client.StartRecv(OnTopicData, 1);
+		client.SubscribeStartWorker(OnTopicData, 1);
 
 		std::unique_lock<std::mutex> lk(mutex);
 		cv.wait(lk);
 	};
 
 	auto Pub = [&](const std::string &topic) {
-		SocketPublish provider(shm);
+		DemoNode provider("server_" + topic, shm);
+
 		for (unsigned i = 0; i < nmsg; ++i) {
 			std::string data = topic + std::to_string(i) + std::string(1000, '-');
 
-			bool r = provider.Publish(pub_proc_id, topic, data.data(), data.size(), timeout);
+			bool r = provider.Publish(topic, data.data(), data.size(), timeout);
 			if (!r) {
 				printf("pub ret: %s\n", r ? "ok" : "fail");
 			}
@@ -184,15 +186,7 @@
 	std::atomic<bool> run(true);
 
 	auto Client = [&](const std::string &topic, const int nreq) {
-		TopicNode client(shm);
-		MsgRegister reg;
-		reg.mutable_proc()->set_proc_id(client_proc_id + topic);
-		MsgCommonReply reply_body;
-
-		if (!client.Register(reg, reply_body, 1000)) {
-			printf("client register failed\n");
-			return;
-		}
+		DemoNode client(client_proc_id + topic, shm);
 
 		std::atomic<int> count(0);
 		std::string reply;
@@ -218,21 +212,13 @@
 		do {
 			std::this_thread::yield();
 		} while (count.load() < nreq);
-		client.ClientStopWorker();
+		client.StopAll();
 		printf("request %s %d done ", topic.c_str(), count.load());
 	};
+
 	std::atomic_uint64_t server_msg_count(0);
 	auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
-		TopicNode server(shm);
-		MsgRegister reg;
-		reg.mutable_proc()->set_proc_id(server_proc_id);
-		reg.mutable_proc()->set_name(name);
-		MsgCommonReply reply_body;
-
-		if (!server.Register(reg, reply_body, 100)) {
-			printf("server register failed\n");
-			return;
-		}
+		DemoNode server(name, shm);
 
 		auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) {
 			++server_msg_count;
@@ -245,6 +231,7 @@
 		for (auto &topic : topics) {
 			rpc.add_topics(topic);
 		}
+		MsgCommonReply reply_body;
 		if (!server.RegisterRPC(rpc, reply_body, 100)) {
 			printf("server register topic failed\n");
 			return;
@@ -262,7 +249,7 @@
 		clients.Launch(Client, t, 1000 * 1);
 	}
 	clients.WaitAll();
-	printf("clients done, server replyed: %d\n", server_msg_count.load());
+	printf("clients done, server replyed: %ld\n", server_msg_count.load());
 	run = false;
 	servers.WaitAll();
 }

--
Gitblit v1.8.0