From c28cdf2fbf1565709b359c9cca6c5e29d9592dce Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 02 四月 2021 15:51:20 +0800 Subject: [PATCH] typedef Topic. --- src/reqrep.h | 78 ++++++++++++++++++++++++++++++++++++--- 1 files changed, 72 insertions(+), 6 deletions(-) diff --git a/src/reqrep.h b/src/reqrep.h index 02cc86f..9e43c7b 100644 --- a/src/reqrep.h +++ b/src/reqrep.h @@ -18,10 +18,14 @@ #ifndef REQREP_ACEH09NK #define REQREP_ACEH09NK +#include "bh_util.h" #include "defs.h" +#include "msg.h" #include "socket.h" #include <functional> #include <unordered_map> + +using bhome::msg::ProcInfo; class SocketRequest : private ShmSocket { @@ -32,26 +36,88 @@ Socket(shm, 64) { StartWorker(); } SocketRequest() : SocketRequest(BHomeShm()) {} + ~SocketRequest() { Stop(); } typedef std::function<void(const std::string &data)> RequestResultCB; bool StartWorker(const RequestResultCB &rrcb, int nworker = 2); bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); } - bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); - bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb) + bool Stop() { return Socket::Stop(); } + bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); + bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb) { return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb); } - bool SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out); - bool SyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, std::string &out) + bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); + bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms) { - return SyncRequest(topic, data.data(), data.size(), timeout_ms, out); + return SyncRequest(topic, data.data(), data.size(), out, timeout_ms); } private: bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb); bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms); - bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms); + bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); std::unordered_map<std::string, RecvCB> async_cbs_; + + typedef bhome_msg::BHAddress Address; + class TopicCache + { + class Impl + { + typedef std::unordered_map<Topic, Address> Store; + Store store_; + + public: + bool Find(const Topic &topic, Address &addr) + { + auto pos = store_.find(topic); + if (pos != store_.end()) { + addr = pos->second; + return true; + } else { + return false; + } + } + bool Update(const Topic &topic, const Address &addr) + { + store_[topic] = addr; + return true; + } + }; + Synced<Impl> impl_; + // Impl &impl() + // { + // thread_local Impl impl; + // return impl; + // } + + public: + bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); } + bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); } + }; + TopicCache topic_cache_; +}; + +class SocketReply : private ShmSocket +{ + typedef ShmSocket Socket; + +public: + SocketReply(Socket::Shm &shm) : + Socket(shm, 64) {} + SocketReply() : + SocketReply(BHomeShm()) {} + ~SocketReply() { Stop(); } + + typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; + bool StartWorker(const OnRequest &rcb, int nworker = 2); + bool Stop() { return Socket::Stop(); } + bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); + bool SendReply(void *src_info, const std::string &data, const int timeout_ms); + bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms); + bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms); + +private: }; #endif // end of include guard: REQREP_ACEH09NK -- Gitblit v1.8.0