From c28cdf2fbf1565709b359c9cca6c5e29d9592dce Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 02 四月 2021 15:51:20 +0800
Subject: [PATCH] typedef Topic.
---
src/pubsub_center.h | 4 +++-
src/reqrep_center.cpp | 4 ++--
src/reqrep.h | 20 ++++++++++----------
src/pubsub.h | 10 +++++-----
src/defs.h | 2 ++
src/pubsub.cpp | 6 +++---
utest/utest.cpp | 6 +++---
src/reqrep.cpp | 6 +++---
8 files changed, 31 insertions(+), 27 deletions(-)
diff --git a/src/defs.h b/src/defs.h
index db73634..dfb00ee 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -21,6 +21,7 @@
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
+#include <string>
typedef boost::uuids::uuid MQId;
@@ -34,6 +35,7 @@
} // namespace bhome_shm
bhome_shm::SharedMemory &BHomeShm();
+typedef std::string Topic;
//TODO center can check shm for previous crash.
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index 4449c31..8d26e0b 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -22,7 +22,7 @@
using namespace std::chrono_literals;
using namespace bhome_msg;
-bool SocketPublish::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
+bool SocketPublish::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
{
try {
MsgI imsg;
@@ -36,7 +36,7 @@
}
}
-bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
+bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
{
try {
return mq().Send(kBHTopicBus, MakeSub(mq().Id(), topics), timeout_ms);
@@ -61,7 +61,7 @@
return tdcb && Start(AsyncRecvProc, nworker);
}
-bool SocketSubscribe::RecvSub(std::string &topic, std::string &data, const int timeout_ms)
+bool SocketSubscribe::RecvSub(Topic &topic, std::string &data, const int timeout_ms)
{
BHMsg msg;
if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
diff --git a/src/pubsub.h b/src/pubsub.h
index ac5a9d2..3c3d4ad 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -33,8 +33,8 @@
shm_(shm) {}
SocketPublish() :
SocketPublish(BHomeShm()) {}
- bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
- bool Publish(const std::string &topic, const std::string &data, const int timeout_ms)
+ bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
+ bool Publish(const Topic &topic, const std::string &data, const int timeout_ms)
{
return Publish(topic, data.data(), data.size(), timeout_ms);
}
@@ -52,11 +52,11 @@
SocketSubscribe(BHomeShm()) {}
~SocketSubscribe() { Stop(); }
- typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB;
+ typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB;
bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
bool Stop() { return Socket::Stop(); }
- bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
- bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
+ bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
+ bool RecvSub(Topic &topic, std::string &data, const int timeout_ms);
};
#endif // end of include guard: PUBSUB_4KGRA997
diff --git a/src/pubsub_center.h b/src/pubsub_center.h
index b752216..aa9db68 100644
--- a/src/pubsub_center.h
+++ b/src/pubsub_center.h
@@ -36,9 +36,11 @@
};
SocketBus socket_;
ShmSocket::Shm &shm() { return socket_.shm(); }
+
std::mutex mutex_;
typedef std::set<MQId> Clients;
- std::unordered_map<std::string, Clients> records_;
+ std::unordered_map<Topic, Clients> records_;
+ bool Find1(const Topic &topic);
public:
PubSubCenter(ShmSocket::Shm &shm) :
diff --git a/src/reqrep.cpp b/src/reqrep.cpp
index 79ff892..b8e423b 100644
--- a/src/reqrep.cpp
+++ b/src/reqrep.cpp
@@ -55,7 +55,7 @@
return Start(AsyncRecvProc, nworker);
}
-bool SocketRequest::AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
+bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
{
auto Call = [&](const void *remote) {
const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
@@ -82,7 +82,7 @@
}
}
-bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
+bool SocketRequest::SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
{
try {
BHAddress addr;
@@ -153,7 +153,7 @@
}
}
-bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
+bool SocketRequest::QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
if (topic_cache_.Find(topic, addr)) {
return true;
diff --git a/src/reqrep.h b/src/reqrep.h
index e8a38f7..9e43c7b 100644
--- a/src/reqrep.h
+++ b/src/reqrep.h
@@ -42,13 +42,13 @@
bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
bool Stop() { return Socket::Stop(); }
- bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
- bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
+ bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
+ bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
{
return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
}
- bool SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
- bool SyncRequest(const std::string &topic, const std::string &data, std::string &out, const int timeout_ms)
+ bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
+ bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
{
return SyncRequest(topic, data.data(), data.size(), out, timeout_ms);
}
@@ -56,7 +56,7 @@
private:
bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
- bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
+ bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
std::unordered_map<std::string, RecvCB> async_cbs_;
typedef bhome_msg::BHAddress Address;
@@ -64,11 +64,11 @@
{
class Impl
{
- typedef std::unordered_map<std::string, Address> Store;
+ typedef std::unordered_map<Topic, Address> Store;
Store store_;
public:
- bool Find(const std::string &topic, Address &addr)
+ bool Find(const Topic &topic, Address &addr)
{
auto pos = store_.find(topic);
if (pos != store_.end()) {
@@ -78,7 +78,7 @@
return false;
}
}
- bool Update(const std::string &topic, const Address &addr)
+ bool Update(const Topic &topic, const Address &addr)
{
store_[topic] = addr;
return true;
@@ -92,8 +92,8 @@
// }
public:
- bool Find(const std::string &topic, Address &addr) { return impl_->Find(topic, addr); }
- bool Update(const std::string &topic, const Address &addr) { return impl_->Update(topic, addr); }
+ bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
+ bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
};
TopicCache topic_cache_;
};
diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
index 0b6ddea..2356ebc 100644
--- a/src/reqrep_center.cpp
+++ b/src/reqrep_center.cpp
@@ -66,7 +66,7 @@
}
}
}
- bool QueryTopic(const std::string &topic, ProcAddr &addr)
+ bool QueryTopic(const Topic &topic, ProcAddr &addr)
{
auto pos = topic_map_.find(topic);
if (pos != topic_map_.end()) {
@@ -96,7 +96,7 @@
};
typedef std::shared_ptr<NodeInfo> Node;
typedef std::weak_ptr<NodeInfo> WeakNode;
- std::unordered_map<std::string, WeakNode> topic_map_;
+ std::unordered_map<Topic, WeakNode> topic_map_;
std::unordered_map<ProcId, Node> nodes_;
};
} // namespace
diff --git a/utest/utest.cpp b/utest/utest.cpp
index b4aa760..637ae26 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -25,7 +25,7 @@
BOOST_AUTO_TEST_CASE(Temp)
{
- std::string topics[] = {
+ Topic topics[] = {
"",
".",
"a",
@@ -128,7 +128,7 @@
}
};
ThreadManager threads;
- typedef std::vector<std::string> Topics;
+ typedef std::vector<Topic> Topics;
Topics topics;
for (int i = 0; i < 100; ++i) {
topics.push_back("t" + std::to_string(i));
@@ -208,7 +208,7 @@
}
};
ThreadManager clients, servers;
- std::vector<std::string> topics = {"topic1", "topic2"};
+ std::vector<Topic> topics = {"topic1", "topic2"};
servers.Launch(Server, "server", topics);
std::this_thread::sleep_for(100ms);
for (auto &t : topics) {
--
Gitblit v1.8.0