/*
|
* =====================================================================================
|
*
|
* Filename: pubsub.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年03月24日 18时44分13秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "pubsub.h"
|
#include <chrono>
|
|
namespace bhome_shm {
|
|
using namespace std::chrono_literals;
|
const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
|
const int kMaxWorker = 16;
|
|
BusManager::BusManager(SharedMemory &shm):
|
busq_(kBusQueueId, shm, 1000),
|
run_(false)
|
{
|
}
|
|
BusManager::~BusManager()
|
{
|
Stop();
|
}
|
|
bool BusManager::Start(const int nworker)
|
{
|
std::lock_guard<std::mutex> guard(mutex_);
|
StopNoLock();
|
// start
|
auto Worker = [&](){
|
while (this->run_) {
|
std::this_thread::sleep_for(100ms);
|
BusManager &self = *this;
|
BHMsg msg;
|
const int timeout_ms = 100;
|
if (!self.busq_.Recv(msg, timeout_ms)) {
|
continue;
|
}
|
// handle msg;
|
// type: subscribe(topic), publish(topic, data)
|
}
|
};
|
|
run_.store(true);
|
const int n = std::min(nworker, kMaxWorker);
|
for (int i = 0; i < n; ++i) {
|
workers_.emplace_back(Worker);
|
}
|
return true;
|
}
|
|
bool BusManager::Stop()
|
{
|
std::lock_guard<std::mutex> guard(mutex_);
|
return StopNoLock();
|
}
|
|
bool BusManager::StopNoLock()
|
{
|
if (run_.exchange(false)) {
|
for (auto &w: workers_) {
|
if (w.joinable()) {
|
w.join();
|
}
|
}
|
return true;
|
}
|
return false;
|
}
|
|
} // namespace bhome_shm
|