/* * ===================================================================================== * * Filename: socket.h * * Description: * * Version: 1.0 * Created: 2021年03月30日 15时49分19秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #ifndef SOCKET_GWTJHBPO #define SOCKET_GWTJHBPO #include "shm_queue.h" #include #include #include #include #include #include #include class ShmSocket : private boost::noncopyable { protected: typedef bhome_shm::ShmMsgQueue Queue; public: typedef bhome_shm::SharedMemory Shm; typedef std::function RecvCB; typedef std::function RecvBHMsgCB; typedef std::function IdleCB; ShmSocket(Shm &shm, const void *id, const int len); ShmSocket(Shm &shm, const int len = 12); ~ShmSocket(); Shm &shm() { return shm_; } // start recv. bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1); bool Start(const RecvCB &onData, int nworker = 1) { return Start(onData, IdleCB(), nworker); } bool Start(const RecvBHMsgCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, onIdle, nworker); } bool Start(const RecvBHMsgCB &onData, int nworker = 1) { return Start(onData, IdleCB(), nworker); } bool Stop(); size_t Pending() const { return mq_ ? mq_->Pending() : 0; } bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms); bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms); protected: const Shm &shm() const { return shm_; } Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid. const Queue &mq() const { return *mq_; } std::mutex &mutex() { return mutex_; } private: bool StopNoLock(); bool RunningNoLock() { return !workers_.empty(); } Shm &shm_; std::vector workers_; std::mutex mutex_; std::atomic run_; std::unique_ptr mq_; }; #endif // end of include guard: SOCKET_GWTJHBPO