From 2197cf91e7a3bd5941327ba630a42946b88f069e Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 09 四月 2021 14:15:41 +0800 Subject: [PATCH] join pub/sub to node; refactor. --- utest/utest.cpp | 45 ++++++++++++++++----------------------------- 1 files changed, 16 insertions(+), 29 deletions(-) diff --git a/utest/utest.cpp b/utest/utest.cpp index c925e22..f88eab9 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -1,8 +1,5 @@ #include "center.h" #include "defs.h" -#include "pubsub.h" -#include "socket.h" -#include "topic_node.h" #include "util.h" #include <atomic> #include <boost/uuid/uuid_generators.hpp> @@ -92,8 +89,12 @@ const uint64_t nmsg = 100 * 2; const int timeout = 1000; auto Sub = [&](int id, const std::vector<std::string> &topics) { - SocketSubscribe client(shm); - bool r = client.Subscribe(sub_proc_id, topics, timeout); + DemoNode client("client_" + std::to_string(id), shm); + + bool r = client.Subscribe(topics, timeout); + if (!r) { + printf("client subscribe failed.\n"); + } std::mutex mutex; std::condition_variable cv; @@ -112,18 +113,19 @@ } // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); }; - client.StartRecv(OnTopicData, 1); + client.SubscribeStartWorker(OnTopicData, 1); std::unique_lock<std::mutex> lk(mutex); cv.wait(lk); }; auto Pub = [&](const std::string &topic) { - SocketPublish provider(shm); + DemoNode provider("server_" + topic, shm); + for (unsigned i = 0; i < nmsg; ++i) { std::string data = topic + std::to_string(i) + std::string(1000, '-'); - bool r = provider.Publish(pub_proc_id, topic, data.data(), data.size(), timeout); + bool r = provider.Publish(topic, data.data(), data.size(), timeout); if (!r) { printf("pub ret: %s\n", r ? "ok" : "fail"); } @@ -184,15 +186,7 @@ std::atomic<bool> run(true); auto Client = [&](const std::string &topic, const int nreq) { - TopicNode client(shm); - MsgRegister reg; - reg.mutable_proc()->set_proc_id(client_proc_id + topic); - MsgCommonReply reply_body; - - if (!client.Register(reg, reply_body, 1000)) { - printf("client register failed\n"); - return; - } + DemoNode client(client_proc_id + topic, shm); std::atomic<int> count(0); std::string reply; @@ -218,21 +212,13 @@ do { std::this_thread::yield(); } while (count.load() < nreq); - client.ClientStopWorker(); + client.StopAll(); printf("request %s %d done ", topic.c_str(), count.load()); }; + std::atomic_uint64_t server_msg_count(0); auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { - TopicNode server(shm); - MsgRegister reg; - reg.mutable_proc()->set_proc_id(server_proc_id); - reg.mutable_proc()->set_name(name); - MsgCommonReply reply_body; - - if (!server.Register(reg, reply_body, 100)) { - printf("server register failed\n"); - return; - } + DemoNode server(name, shm); auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) { ++server_msg_count; @@ -245,6 +231,7 @@ for (auto &topic : topics) { rpc.add_topics(topic); } + MsgCommonReply reply_body; if (!server.RegisterRPC(rpc, reply_body, 100)) { printf("server register topic failed\n"); return; @@ -262,7 +249,7 @@ clients.Launch(Client, t, 1000 * 1); } clients.WaitAll(); - printf("clients done, server replyed: %d\n", server_msg_count.load()); + printf("clients done, server replyed: %ld\n", server_msg_count.load()); run = false; servers.WaitAll(); } -- Gitblit v1.8.0