From 2e99e5311d1b9a53cca17008452cbe49e2af7234 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 31 三月 2021 14:05:09 +0800 Subject: [PATCH] add bus socket for manager; refactor. --- src/socket.h | 23 ++++++++++++----------- 1 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/socket.h b/src/socket.h index 92c1b73..b94eca2 100644 --- a/src/socket.h +++ b/src/socket.h @@ -21,14 +21,14 @@ #include "shm_queue.h" #include <atomic> -#include <condition_variable> +#include <boost/noncopyable.hpp> #include <functional> #include <memory> #include <mutex> #include <thread> #include <vector> -class ShmSocket +class ShmSocket : private boost::noncopyable { typedef bhome_shm::ShmMsgQueue Queue; @@ -38,13 +38,14 @@ eSockReply, eSockSubscribe, eSockPublish, + eSockBus, }; typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB; + typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB; - ShmSocket(Type type); ShmSocket(Type type, bhome_shm::SharedMemory &shm); + ShmSocket(Type type); ~ShmSocket(); - // bool Request(const std::string &topic, const void *data, const size_t size, onReply); bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv @@ -55,19 +56,19 @@ bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms); bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms); bool RecvSub(std::string &topic, std::string &data, const int timeout_ms); - bool SetRecvCallback(const RecvCB &onRecv); + + // start recv. + bool Start(const RecvCB &onData, int nworker = 1); + bool StartRaw(const RecvRawCB &onData, int nworker = 1); + bool Stop(); private: - bool HasRecvCB(); - void Stop(); - + bool StopNoLock(); bhome_shm::SharedMemory &shm_; - Type type_; + const Type type_; std::vector<std::thread> workers_; std::mutex mutex_; - std::condition_variable cv_recv_cb_; std::atomic<bool> run_; - RecvCB onRecv_; std::unique_ptr<Queue> mq_; }; -- Gitblit v1.8.0