/* * ===================================================================================== * * Filename: topic_request.h * * Description: topic request socket * * Version: 1.0 * Created: 2021年04月01日 09时36分06秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #ifndef TOPIC_REQUEST_ACEH09NK #define TOPIC_REQUEST_ACEH09NK #include "bh_util.h" #include "defs.h" #include "msg.h" #include "socket.h" #include #include using bhome::msg::ProcInfo; class SocketRequest : private ShmSocket { typedef ShmSocket Socket; public: SocketRequest(Socket::Shm &shm) : Socket(shm, 64) { StartWorker(); } SocketRequest() : SocketRequest(BHomeShm()) {} ~SocketRequest() { Stop(); } typedef std::function RequestResultCB; bool StartWorker(const RequestResultCB &rrcb, int nworker = 2); bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); } bool Stop() { return Socket::Stop(); } bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms); bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb) { return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb); } bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms) { return AsyncRequest(topic, data.data(), data.size(), timeout_ms); } bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms) { return SyncRequest(topic, data.data(), data.size(), out, timeout_ms); } private: bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvBHMsgCB &cb); bool AsyncSend(const void *remote, const void *msg, const int timeout_ms); bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms); bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); std::unordered_map async_cbs_; typedef bhome_msg::BHAddress Address; class TopicCache { class Impl { typedef std::unordered_map Store; Store store_; public: bool Find(const Topic &topic, Address &addr) { auto pos = store_.find(topic); if (pos != store_.end()) { addr = pos->second; return true; } else { return false; } } bool Update(const Topic &topic, const Address &addr) { store_[topic] = addr; return true; } }; Synced impl_; // Impl &impl() // { // thread_local Impl impl; // return impl; // } public: bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); } bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); } }; TopicCache topic_cache_; }; #endif // end of include guard: TOPIC_REQUEST_ACEH09NK