From c28cdf2fbf1565709b359c9cca6c5e29d9592dce Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 02 四月 2021 15:51:20 +0800
Subject: [PATCH] typedef Topic.

---
 src/pubsub_center.h   |    4 +++-
 src/reqrep_center.cpp |    4 ++--
 src/reqrep.h          |   20 ++++++++++----------
 src/pubsub.h          |   10 +++++-----
 src/defs.h            |    2 ++
 src/pubsub.cpp        |    6 +++---
 utest/utest.cpp       |    6 +++---
 src/reqrep.cpp        |    6 +++---
 8 files changed, 31 insertions(+), 27 deletions(-)

diff --git a/src/defs.h b/src/defs.h
index db73634..dfb00ee 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -21,6 +21,7 @@
 
 #include <boost/uuid/uuid.hpp>
 #include <boost/uuid/uuid_generators.hpp>
+#include <string>
 
 typedef boost::uuids::uuid MQId;
 
@@ -34,6 +35,7 @@
 } // namespace bhome_shm
 
 bhome_shm::SharedMemory &BHomeShm();
+typedef std::string Topic;
 
 //TODO center can check shm for previous crash.
 
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index 4449c31..8d26e0b 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -22,7 +22,7 @@
 using namespace std::chrono_literals;
 using namespace bhome_msg;
 
-bool SocketPublish::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
+bool SocketPublish::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
 {
 	try {
 		MsgI imsg;
@@ -36,7 +36,7 @@
 	}
 }
 
-bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
+bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
 {
 	try {
 		return mq().Send(kBHTopicBus, MakeSub(mq().Id(), topics), timeout_ms);
@@ -61,7 +61,7 @@
 	return tdcb && Start(AsyncRecvProc, nworker);
 }
 
-bool SocketSubscribe::RecvSub(std::string &topic, std::string &data, const int timeout_ms)
+bool SocketSubscribe::RecvSub(Topic &topic, std::string &data, const int timeout_ms)
 {
 	BHMsg msg;
 	if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
diff --git a/src/pubsub.h b/src/pubsub.h
index ac5a9d2..3c3d4ad 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -33,8 +33,8 @@
 	    shm_(shm) {}
 	SocketPublish() :
 	    SocketPublish(BHomeShm()) {}
-	bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
-	bool Publish(const std::string &topic, const std::string &data, const int timeout_ms)
+	bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
+	bool Publish(const Topic &topic, const std::string &data, const int timeout_ms)
 	{
 		return Publish(topic, data.data(), data.size(), timeout_ms);
 	}
@@ -52,11 +52,11 @@
 	    SocketSubscribe(BHomeShm()) {}
 	~SocketSubscribe() { Stop(); }
 
-	typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB;
+	typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB;
 	bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
 	bool Stop() { return Socket::Stop(); }
-	bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
-	bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
+	bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
+	bool RecvSub(Topic &topic, std::string &data, const int timeout_ms);
 };
 
 #endif // end of include guard: PUBSUB_4KGRA997
diff --git a/src/pubsub_center.h b/src/pubsub_center.h
index b752216..aa9db68 100644
--- a/src/pubsub_center.h
+++ b/src/pubsub_center.h
@@ -36,9 +36,11 @@
 	};
 	SocketBus socket_;
 	ShmSocket::Shm &shm() { return socket_.shm(); }
+
 	std::mutex mutex_;
 	typedef std::set<MQId> Clients;
-	std::unordered_map<std::string, Clients> records_;
+	std::unordered_map<Topic, Clients> records_;
+	bool Find1(const Topic &topic);
 
 public:
 	PubSubCenter(ShmSocket::Shm &shm) :
diff --git a/src/reqrep.cpp b/src/reqrep.cpp
index 79ff892..b8e423b 100644
--- a/src/reqrep.cpp
+++ b/src/reqrep.cpp
@@ -55,7 +55,7 @@
 	return Start(AsyncRecvProc, nworker);
 }
 
-bool SocketRequest::AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
+bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
 {
 	auto Call = [&](const void *remote) {
 		const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
@@ -82,7 +82,7 @@
 	}
 }
 
-bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
+bool SocketRequest::SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
 {
 	try {
 		BHAddress addr;
@@ -153,7 +153,7 @@
 	}
 }
 
-bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
+bool SocketRequest::QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
 {
 	if (topic_cache_.Find(topic, addr)) {
 		return true;
diff --git a/src/reqrep.h b/src/reqrep.h
index e8a38f7..9e43c7b 100644
--- a/src/reqrep.h
+++ b/src/reqrep.h
@@ -42,13 +42,13 @@
 	bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
 	bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
 	bool Stop() { return Socket::Stop(); }
-	bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
-	bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
+	bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
+	bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
 	{
 		return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
 	}
-	bool SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
-	bool SyncRequest(const std::string &topic, const std::string &data, std::string &out, const int timeout_ms)
+	bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
+	bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
 	{
 		return SyncRequest(topic, data.data(), data.size(), out, timeout_ms);
 	}
@@ -56,7 +56,7 @@
 private:
 	bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
 	bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
-	bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
+	bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
 	std::unordered_map<std::string, RecvCB> async_cbs_;
 
 	typedef bhome_msg::BHAddress Address;
@@ -64,11 +64,11 @@
 	{
 		class Impl
 		{
-			typedef std::unordered_map<std::string, Address> Store;
+			typedef std::unordered_map<Topic, Address> Store;
 			Store store_;
 
 		public:
-			bool Find(const std::string &topic, Address &addr)
+			bool Find(const Topic &topic, Address &addr)
 			{
 				auto pos = store_.find(topic);
 				if (pos != store_.end()) {
@@ -78,7 +78,7 @@
 					return false;
 				}
 			}
-			bool Update(const std::string &topic, const Address &addr)
+			bool Update(const Topic &topic, const Address &addr)
 			{
 				store_[topic] = addr;
 				return true;
@@ -92,8 +92,8 @@
 		// }
 
 	public:
-		bool Find(const std::string &topic, Address &addr) { return impl_->Find(topic, addr); }
-		bool Update(const std::string &topic, const Address &addr) { return impl_->Update(topic, addr); }
+		bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
+		bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
 	};
 	TopicCache topic_cache_;
 };
diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
index 0b6ddea..2356ebc 100644
--- a/src/reqrep_center.cpp
+++ b/src/reqrep_center.cpp
@@ -66,7 +66,7 @@
 			}
 		}
 	}
-	bool QueryTopic(const std::string &topic, ProcAddr &addr)
+	bool QueryTopic(const Topic &topic, ProcAddr &addr)
 	{
 		auto pos = topic_map_.find(topic);
 		if (pos != topic_map_.end()) {
@@ -96,7 +96,7 @@
 	};
 	typedef std::shared_ptr<NodeInfo> Node;
 	typedef std::weak_ptr<NodeInfo> WeakNode;
-	std::unordered_map<std::string, WeakNode> topic_map_;
+	std::unordered_map<Topic, WeakNode> topic_map_;
 	std::unordered_map<ProcId, Node> nodes_;
 };
 } // namespace
diff --git a/utest/utest.cpp b/utest/utest.cpp
index b4aa760..637ae26 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -25,7 +25,7 @@
 
 BOOST_AUTO_TEST_CASE(Temp)
 {
-	std::string topics[] = {
+	Topic topics[] = {
 	    "",
 	    ".",
 	    "a",
@@ -128,7 +128,7 @@
 		}
 	};
 	ThreadManager threads;
-	typedef std::vector<std::string> Topics;
+	typedef std::vector<Topic> Topics;
 	Topics topics;
 	for (int i = 0; i < 100; ++i) {
 		topics.push_back("t" + std::to_string(i));
@@ -208,7 +208,7 @@
 		}
 	};
 	ThreadManager clients, servers;
-	std::vector<std::string> topics = {"topic1", "topic2"};
+	std::vector<Topic> topics = {"topic1", "topic2"};
 	servers.Launch(Server, "server", topics);
 	std::this_thread::sleep_for(100ms);
 	for (auto &t : topics) {

--
Gitblit v1.8.0