/* * ===================================================================================== * * Filename: socket.h * * Description: * * Version: 1.0 * Created: 2021年03月30日 15时49分19秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #ifndef SOCKET_GWTJHBPO #define SOCKET_GWTJHBPO #include "shm_queue.h" #include #include #include #include #include #include #include #include class ShmSocket : private boost::noncopyable { typedef bhome_shm::ShmMsgQueue Queue; public: enum Type { eSockRequest, eSockReply, eSockSubscribe, eSockPublish, eSockBus, }; typedef std::function RecvCB; typedef std::function RecvRawCB; typedef std::function RequestResultCB; ShmSocket(Type type, bhome_shm::SharedMemory &shm); ShmSocket(Type type); ~ShmSocket(); bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out); // bool HandleRequest(onData); bool ReadRequest(); // exclude with HandleRequest bool SendReply(); // exclude with HandleRequest bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms); bool Subscribe(const std::vector &topics, const int timeout_ms); bool RecvSub(std::string &topic, std::string &data, const int timeout_ms); // start recv. bool Start(const RecvCB &onData, int nworker = 1); bool StartRaw(const RecvRawCB &onData, int nworker = 1); bool StartAsync(int nworker = 2); bool Stop(); size_t Pending() const { return mq_ ? mq_->Pending() : 0; } private: bool AsyncRequest(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb); bool SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms); bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms); bool StopNoLock(); bhome_shm::SharedMemory &shm_; const Type type_; std::vector workers_; std::mutex mutex_; std::atomic run_; std::unique_ptr mq_; std::unordered_map async_cbs_; }; #endif // end of include guard: SOCKET_GWTJHBPO