From c338820e4db43ad32c20ff429a038b06bcb980f8 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 08 四月 2021 18:13:25 +0800
Subject: [PATCH] BIG change, join center,bus; now msg is head+body.

---
 utest/utest.cpp |   75 +++++++++++++++++++++++++------------
 1 files changed, 50 insertions(+), 25 deletions(-)

diff --git a/utest/utest.cpp b/utest/utest.cpp
index 092455f..c925e22 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,11 +1,8 @@
 #include "center.h"
 #include "defs.h"
 #include "pubsub.h"
-#include "pubsub_center.h"
-#include "reqrep_center.h"
 #include "socket.h"
-#include "topic_reply.h"
-#include "topic_request.h"
+#include "topic_node.h"
 #include "util.h"
 #include <atomic>
 #include <boost/uuid/uuid_generators.hpp>
@@ -15,6 +12,7 @@
 #include <string>
 #include <thread>
 #include <vector>
+using namespace bhome_msg;
 
 template <class A, class B>
 struct IsSameType {
@@ -79,9 +77,11 @@
 	int *flag = shm.find_or_construct<int>("flag")(123);
 	printf("flag = %d\n", *flag);
 	++*flag;
+	const std::string sub_proc_id = "subscriber";
+	const std::string pub_proc_id = "publisher";
 
-	PubSubCenter bus(shm);
-	bus.Start();
+	BHCenter center(shm);
+	center.Start();
 
 	std::this_thread::sleep_for(100ms);
 
@@ -93,12 +93,12 @@
 	const int timeout = 1000;
 	auto Sub = [&](int id, const std::vector<std::string> &topics) {
 		SocketSubscribe client(shm);
-		bool r = client.Subscribe(topics, timeout);
+		bool r = client.Subscribe(sub_proc_id, topics, timeout);
 		std::mutex mutex;
 		std::condition_variable cv;
 
 		std::atomic<uint64_t> n(0);
-		auto OnTopicData = [&](const std::string &topic, const std::string &data) {
+		auto OnTopicData = [&](const std::string &proc_id, const std::string &topic, const std::string &data) {
 			++total_count;
 
 			auto cur = Now();
@@ -123,7 +123,7 @@
 		for (unsigned i = 0; i < nmsg; ++i) {
 			std::string data = topic + std::to_string(i) + std::string(1000, '-');
 
-			bool r = provider.Publish(topic, data, timeout);
+			bool r = provider.Publish(pub_proc_id, topic, data.data(), data.size(), timeout);
 			if (!r) {
 				printf("pub ret: %s\n", r ? "ok" : "fail");
 			}
@@ -150,9 +150,8 @@
 	std::cout << "end : " << Now();
 	printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
 	       total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
-
-	bus.Stop();
 }
+
 namespace
 {
 struct C {
@@ -177,12 +176,24 @@
 	printf("flag = %d\n", *flag);
 	++*flag;
 
+	const std::string client_proc_id = "client_proc_";
+	const std::string server_proc_id = "server_proc_";
+
 	BHCenter center(shm);
 	center.Start();
 	std::atomic<bool> run(true);
 
 	auto Client = [&](const std::string &topic, const int nreq) {
-		SocketRequest client(shm);
+		TopicNode client(shm);
+		MsgRegister reg;
+		reg.mutable_proc()->set_proc_id(client_proc_id + topic);
+		MsgCommonReply reply_body;
+
+		if (!client.Register(reg, reply_body, 1000)) {
+			printf("client register failed\n");
+			return;
+		}
+
 		std::atomic<int> count(0);
 		std::string reply;
 		auto onRecv = [&](const std::string &rep) {
@@ -191,40 +202,54 @@
 				printf("count: %d\n", count.load());
 			}
 		};
-		client.StartWorker(onRecv, 2);
+		client.ClientStartWorker(onRecv, 2);
 		boost::timer::auto_cpu_timer timer;
 		for (int i = 0; i < nreq; ++i) {
-			if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) {
+			if (!client.ClientAsyncRequest(topic, "data " + std::to_string(i), 1000)) {
 				printf("client request failed\n");
+				++count;
 			}
 
 			// if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
 			// 	printf("client request failed\n");
-			// } else {
-			// 	++count;
 			// }
+			// 	++count;
 		}
 		do {
 			std::this_thread::yield();
 		} while (count.load() < nreq);
-		client.Stop();
+		client.ClientStopWorker();
 		printf("request %s %d done ", topic.c_str(), count.load());
 	};
 	std::atomic_uint64_t server_msg_count(0);
 	auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
-		SocketReply server(shm);
-		ProcInfo info;
-		info.set_id(name);
-		info.set_name(name);
-		if (!server.Register(info, topics, 100)) {
-			printf("register failed\n");
+		TopicNode server(shm);
+		MsgRegister reg;
+		reg.mutable_proc()->set_proc_id(server_proc_id);
+		reg.mutable_proc()->set_name(name);
+		MsgCommonReply reply_body;
+
+		if (!server.Register(reg, reply_body, 100)) {
+			printf("server register failed\n");
+			return;
 		}
+
 		auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) {
 			++server_msg_count;
 			reply = topic + ':' + data;
 			return true;
 		};
-		server.StartWorker(onData);
+		server.ServerStart(onData);
+
+		MsgRegisterRPC rpc;
+		for (auto &topic : topics) {
+			rpc.add_topics(topic);
+		}
+		if (!server.RegisterRPC(rpc, reply_body, 100)) {
+			printf("server register topic failed\n");
+			return;
+		}
+
 		while (run) {
 			std::this_thread::yield();
 		}
@@ -234,7 +259,7 @@
 	servers.Launch(Server, "server", topics);
 	std::this_thread::sleep_for(100ms);
 	for (auto &t : topics) {
-		clients.Launch(Client, t, 1000 * 100);
+		clients.Launch(Client, t, 1000 * 1);
 	}
 	clients.WaitAll();
 	printf("clients done, server replyed: %d\n", server_msg_count.load());

--
Gitblit v1.8.0