From c28cdf2fbf1565709b359c9cca6c5e29d9592dce Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 02 四月 2021 15:51:20 +0800 Subject: [PATCH] typedef Topic. --- src/pubsub_center.h | 4 +++- src/reqrep_center.cpp | 4 ++-- src/reqrep.h | 20 ++++++++++---------- src/pubsub.h | 10 +++++----- src/defs.h | 2 ++ src/pubsub.cpp | 6 +++--- utest/utest.cpp | 6 +++--- src/reqrep.cpp | 6 +++--- 8 files changed, 31 insertions(+), 27 deletions(-) diff --git a/src/defs.h b/src/defs.h index db73634..dfb00ee 100644 --- a/src/defs.h +++ b/src/defs.h @@ -21,6 +21,7 @@ #include <boost/uuid/uuid.hpp> #include <boost/uuid/uuid_generators.hpp> +#include <string> typedef boost::uuids::uuid MQId; @@ -34,6 +35,7 @@ } // namespace bhome_shm bhome_shm::SharedMemory &BHomeShm(); +typedef std::string Topic; //TODO center can check shm for previous crash. diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 4449c31..8d26e0b 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -22,7 +22,7 @@ using namespace std::chrono_literals; using namespace bhome_msg; -bool SocketPublish::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms) +bool SocketPublish::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms) { try { MsgI imsg; @@ -36,7 +36,7 @@ } } -bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms) +bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms) { try { return mq().Send(kBHTopicBus, MakeSub(mq().Id(), topics), timeout_ms); @@ -61,7 +61,7 @@ return tdcb && Start(AsyncRecvProc, nworker); } -bool SocketSubscribe::RecvSub(std::string &topic, std::string &data, const int timeout_ms) +bool SocketSubscribe::RecvSub(Topic &topic, std::string &data, const int timeout_ms) { BHMsg msg; if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) { diff --git a/src/pubsub.h b/src/pubsub.h index ac5a9d2..3c3d4ad 100644 --- a/src/pubsub.h +++ b/src/pubsub.h @@ -33,8 +33,8 @@ shm_(shm) {} SocketPublish() : SocketPublish(BHomeShm()) {} - bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms); - bool Publish(const std::string &topic, const std::string &data, const int timeout_ms) + bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms); + bool Publish(const Topic &topic, const std::string &data, const int timeout_ms) { return Publish(topic, data.data(), data.size(), timeout_ms); } @@ -52,11 +52,11 @@ SocketSubscribe(BHomeShm()) {} ~SocketSubscribe() { Stop(); } - typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB; + typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB; bool StartRecv(const TopicDataCB &tdcb, int nworker = 2); bool Stop() { return Socket::Stop(); } - bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms); - bool RecvSub(std::string &topic, std::string &data, const int timeout_ms); + bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms); + bool RecvSub(Topic &topic, std::string &data, const int timeout_ms); }; #endif // end of include guard: PUBSUB_4KGRA997 diff --git a/src/pubsub_center.h b/src/pubsub_center.h index b752216..aa9db68 100644 --- a/src/pubsub_center.h +++ b/src/pubsub_center.h @@ -36,9 +36,11 @@ }; SocketBus socket_; ShmSocket::Shm &shm() { return socket_.shm(); } + std::mutex mutex_; typedef std::set<MQId> Clients; - std::unordered_map<std::string, Clients> records_; + std::unordered_map<Topic, Clients> records_; + bool Find1(const Topic &topic); public: PubSubCenter(ShmSocket::Shm &shm) : diff --git a/src/reqrep.cpp b/src/reqrep.cpp index 79ff892..b8e423b 100644 --- a/src/reqrep.cpp +++ b/src/reqrep.cpp @@ -55,7 +55,7 @@ return Start(AsyncRecvProc, nworker); } -bool SocketRequest::AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) +bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) { auto Call = [&](const void *remote) { const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); @@ -82,7 +82,7 @@ } } -bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) +bool SocketRequest::SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) { try { BHAddress addr; @@ -153,7 +153,7 @@ } } -bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms) +bool SocketRequest::QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) { if (topic_cache_.Find(topic, addr)) { return true; diff --git a/src/reqrep.h b/src/reqrep.h index e8a38f7..9e43c7b 100644 --- a/src/reqrep.h +++ b/src/reqrep.h @@ -42,13 +42,13 @@ bool StartWorker(const RequestResultCB &rrcb, int nworker = 2); bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); } bool Stop() { return Socket::Stop(); } - bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); - bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb) + bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); + bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb) { return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb); } - bool SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); - bool SyncRequest(const std::string &topic, const std::string &data, std::string &out, const int timeout_ms) + bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); + bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms) { return SyncRequest(topic, data.data(), data.size(), out, timeout_ms); } @@ -56,7 +56,7 @@ private: bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb); bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms); - bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms); + bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); std::unordered_map<std::string, RecvCB> async_cbs_; typedef bhome_msg::BHAddress Address; @@ -64,11 +64,11 @@ { class Impl { - typedef std::unordered_map<std::string, Address> Store; + typedef std::unordered_map<Topic, Address> Store; Store store_; public: - bool Find(const std::string &topic, Address &addr) + bool Find(const Topic &topic, Address &addr) { auto pos = store_.find(topic); if (pos != store_.end()) { @@ -78,7 +78,7 @@ return false; } } - bool Update(const std::string &topic, const Address &addr) + bool Update(const Topic &topic, const Address &addr) { store_[topic] = addr; return true; @@ -92,8 +92,8 @@ // } public: - bool Find(const std::string &topic, Address &addr) { return impl_->Find(topic, addr); } - bool Update(const std::string &topic, const Address &addr) { return impl_->Update(topic, addr); } + bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); } + bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); } }; TopicCache topic_cache_; }; diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp index 0b6ddea..2356ebc 100644 --- a/src/reqrep_center.cpp +++ b/src/reqrep_center.cpp @@ -66,7 +66,7 @@ } } } - bool QueryTopic(const std::string &topic, ProcAddr &addr) + bool QueryTopic(const Topic &topic, ProcAddr &addr) { auto pos = topic_map_.find(topic); if (pos != topic_map_.end()) { @@ -96,7 +96,7 @@ }; typedef std::shared_ptr<NodeInfo> Node; typedef std::weak_ptr<NodeInfo> WeakNode; - std::unordered_map<std::string, WeakNode> topic_map_; + std::unordered_map<Topic, WeakNode> topic_map_; std::unordered_map<ProcId, Node> nodes_; }; } // namespace diff --git a/utest/utest.cpp b/utest/utest.cpp index b4aa760..637ae26 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -25,7 +25,7 @@ BOOST_AUTO_TEST_CASE(Temp) { - std::string topics[] = { + Topic topics[] = { "", ".", "a", @@ -128,7 +128,7 @@ } }; ThreadManager threads; - typedef std::vector<std::string> Topics; + typedef std::vector<Topic> Topics; Topics topics; for (int i = 0; i < 100; ++i) { topics.push_back("t" + std::to_string(i)); @@ -208,7 +208,7 @@ } }; ThreadManager clients, servers; - std::vector<std::string> topics = {"topic1", "topic2"}; + std::vector<Topic> topics = {"topic1", "topic2"}; servers.Launch(Server, "server", topics); std::this_thread::sleep_for(100ms); for (auto &t : topics) { -- Gitblit v1.8.0