From 0bc72d004b08b6cac005931787f43c68dace7685 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 02 四月 2021 16:25:39 +0800
Subject: [PATCH] refactor pub/sub center.

---
 utest/utest.cpp |   73 +++++++++++++++++++++++++++++++++++-
 1 files changed, 71 insertions(+), 2 deletions(-)

diff --git a/utest/utest.cpp b/utest/utest.cpp
index b95e646..637ae26 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,6 +1,8 @@
 #include "defs.h"
 #include "pubsub.h"
 #include "pubsub_center.h"
+#include "reqrep.h"
+#include "reqrep_center.h"
 #include "socket.h"
 #include "util.h"
 #include <atomic>
@@ -23,7 +25,7 @@
 
 BOOST_AUTO_TEST_CASE(Temp)
 {
-	std::string topics[] = {
+	Topic topics[] = {
 	    "",
 	    ".",
 	    "a",
@@ -126,7 +128,7 @@
 		}
 	};
 	ThreadManager threads;
-	typedef std::vector<std::string> Topics;
+	typedef std::vector<Topic> Topics;
 	Topics topics;
 	for (int i = 0; i < 100; ++i) {
 		topics.push_back("t" + std::to_string(i));
@@ -149,6 +151,73 @@
 
 	bus.Stop();
 }
+namespace
+{
+struct C {
+	C() { printf("+C\n"); }
+	C(const C &c) { printf("+C(const C&)\n"); }
+	void F() { printf("C::F()\n"); }
+	~C() { printf("-C\n"); }
+	char arr[100];
+};
+int F(C &c) { return printf(":::::::::::::F()\n"); }
+} // namespace
+
+BOOST_AUTO_TEST_CASE(ReqRepTest)
+{
+	const std::string shm_name("ShmReqRep");
+	ShmRemover auto_remove(shm_name);
+	SharedMemory shm(shm_name, 1024 * 1024 * 50);
+
+	auto Avail = [&]() { return shm.get_free_memory(); };
+	auto init_avail = Avail();
+	int *flag = shm.find_or_construct<int>("flag")(123);
+	printf("flag = %d\n", *flag);
+	++*flag;
+
+	ReqRepCenter center(shm);
+	center.Start(2);
+	std::atomic<bool> run(true);
+
+	auto Client = [&](const std::string &topic, const int nreq) {
+		SocketRequest client(shm);
+		std::string reply;
+		boost::timer::auto_cpu_timer timer;
+		for (int i = 0; i < nreq; ++i) {
+			if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
+				printf("client request failed\n");
+			}
+		}
+		printf("request %s %d done ", topic.c_str(), nreq);
+	};
+	auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
+		SocketReply server(shm);
+		ProcInfo info;
+		info.set_id(name);
+		info.set_name(name);
+		if (!server.Register(info, topics, 100)) {
+			printf("register failed\n");
+		}
+		auto onData = [](const std::string &topic, const std::string &data, std::string &reply) {
+			reply = topic + ':' + data;
+			return true;
+		};
+		server.StartWorker(onData);
+		while (run) {
+			std::this_thread::yield();
+		}
+	};
+	ThreadManager clients, servers;
+	std::vector<Topic> topics = {"topic1", "topic2"};
+	servers.Launch(Server, "server", topics);
+	std::this_thread::sleep_for(100ms);
+	for (auto &t : topics) {
+		clients.Launch(Client, t, 1000 * 100);
+	}
+	clients.WaitAll();
+	run = false;
+	servers.WaitAll();
+}
 
 inline int MyMin(int a, int b)
 {

--
Gitblit v1.8.0