/* * ===================================================================================== * * Filename: pubsub.cpp * * Description: * * Version: 1.0 * Created: 2021年03月24日 18时44分13秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "pubsub.h" #include namespace bhome_shm { using namespace std::chrono_literals; const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); const int kMaxWorker = 16; BusManager::BusManager(SharedMemory &shm): busq_(kBusQueueId, shm, 1000), run_(false) { } BusManager::~BusManager() { Stop(); } bool BusManager::Start(const int nworker) { std::lock_guard guard(mutex_); StopNoLock(); // start auto Worker = [&](){ while (this->run_) { std::this_thread::sleep_for(100ms); BusManager &self = *this; Msg msg; const int timeout_ms = 100; if (!self.busq_.Recv(msg, timeout_ms)) { continue; } // handle msg; // type: subscribe(topic), publish(topic, data) } }; run_.store(true); const int n = std::min(nworker, kMaxWorker); for (int i = 0; i < n; ++i) { workers_.emplace_back(Worker); } } bool BusManager::Stop() { std::lock_guard guard(mutex_); StopNoLock(); } bool BusManager::StopNoLock() { if (run_.exchange(false)) { for (auto &w: workers_) { if (w.joinable()) { w.join(); } } } } } // namespace bhome_shm