From 3c2b6739208d961cf8b86460d7f05516d044960c Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 31 三月 2021 19:13:42 +0800 Subject: [PATCH] add async recv suport; sync by waiting for async. --- src/socket.h | 11 +++++++++-- 1 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/socket.h b/src/socket.h index c468dd3..eee5b5b 100644 --- a/src/socket.h +++ b/src/socket.h @@ -26,6 +26,7 @@ #include <memory> #include <mutex> #include <thread> +#include <unordered_map> #include <vector> class ShmSocket : private boost::noncopyable @@ -42,12 +43,13 @@ }; typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB; typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB; + typedef std::function<void(const void *data, const size_t size)> RequestResultCB; ShmSocket(Type type, bhome_shm::SharedMemory &shm); ShmSocket(Type type); ~ShmSocket(); - // bool Request(const std::string &topic, const void *data, const size_t size, onReply); - bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv + bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); + bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out); // bool HandleRequest(onData); bool ReadRequest(); // exclude with HandleRequest @@ -60,10 +62,14 @@ // start recv. bool Start(const RecvCB &onData, int nworker = 1); bool StartRaw(const RecvRawCB &onData, int nworker = 1); + bool StartAsync(int nworker = 2); bool Stop(); size_t Pending() const { return mq_ ? mq_->Pending() : 0; } private: + bool AsyncRequest(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb); + bool SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms); + bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms); bool StopNoLock(); bhome_shm::SharedMemory &shm_; const Type type_; @@ -72,6 +78,7 @@ std::atomic<bool> run_; std::unique_ptr<Queue> mq_; + std::unordered_map<std::string, RecvCB> async_cbs_; }; #endif // end of include guard: SOCKET_GWTJHBPO -- Gitblit v1.8.0