#ifndef _SAFEQUEUEIMPL_H_ #define _SAFEQUEUEIMPL_H_ template SafeQueue::SafeQueue(std::size_t a_maxSize): m_theQueue(), m_maximumSize(a_maxSize), m_mutex(), m_cond() { } template SafeQueue::~SafeQueue() { } template SafeQueue::SafeQueue(const SafeQueue& a_src): m_theQueue(), m_maximumSize(0), m_mutex(), m_cond() { // copying a safe queue involves only copying the data (m_theQueue and // m_maximumSize). This object has not been instantiated yet so nobody can // be trying to perform push or pop operations on it, but we need to // acquire a_src.m_mutex before copying its data into m_theQueue and // m_maximumSize std::unique_lock lk(a_src.m_mutex); this->m_maximumSize = a_src.m_maximumSize; this->m_theQueue = a_src.m_theQueue; } template const SafeQueue& SafeQueue::operator=(const SafeQueue &a_src) { if (this != &a_src) { // lock both mutexes at the same time to avoid deadlocks std::unique_lock this_lk(this->m_mutex, std::defer_lock); std::unique_lock src_lk (a_src.m_mutex, std::defer_lock); std::lock(this_lk, src_lk); // will we need to wake up waiting threads after copying the source // queue? bool wakeUpWaitingThreads = WakeUpSignalNeeded(a_src); // copy data from the left side of the operator= into this intance this->m_maximumSize = a_src.m_maximumSize; this->m_theQueue = a_src.m_theQueue; // time now to wake up threads waiting for data to be inserted // or extracted if (wakeUpWaitingThreads) { this->m_cond.notify_all(); } } return *this; } template SafeQueue::SafeQueue(SafeQueue&& a_src): m_theQueue(a_src.m_theQueue), // implicit std::move(a_src.m_theQueue) m_maximumSize(a_src.m_maximumSize), // move constructor called implicitly m_mutex(), // instantiate a new mutex m_cond() // instantiate a new conditional variable { // This object has not been instantiated yet. We can assume no one is using // its mutex. // Also, a_src is a temporary object so there is no need to acquire // its mutex. // Things can therefore be safely moved without the need for any mutex or // conditional variable } template SafeQueue& SafeQueue::operator=(SafeQueue &&a_src) { if (this != &a_src) { // make sure we hold this mutex before moving things around. a_src is // a temporary object so no need to hold its mutex std::unique_lock lk(this->m_mutex); // will we need to wake up waiting threads after copying the source // queue? bool wakeUpWaitingThreads = WakeUpSignalNeeded(a_src); // process data from the temporary copy into this intance this->m_maximumSize = std::move(a_src.m_maximumSize); this->m_theQueue = std::move(a_src.m_theQueue); // time now to wake up threads waiting for data to be inserted // or extracted if (wakeUpWaitingThreads) { this->m_cond.notify_all(); } } return *this; } template bool SafeQueue::wakeUpSignalNeeded(const SafeQueue &a_src) const { if (this->m_theQueue.empty() && (!a_src.m_theQueue.empty())) { // threads waiting for stuff to be popped off the queue return true; } else if ((this->m_theQueue.size() >= this->m_maximumSize) && (a_src.m_theQueue.size() < a_src.m_maximumSize)) { // threads waiting for stuff to be pushed into the queue return true; } return false; } template bool SafeQueue::isEmpty() const { std::lock_guard lk(m_mutex); return m_theQueue.empty(); } template void SafeQueue::push(const T &a_elem) { std::unique_lock lk(m_mutex); while (m_theQueue.size() >= m_maximumSize) { m_cond.wait(lk); } bool queueEmpty = m_theQueue.empty(); m_theQueue.push(a_elem); if (queueEmpty) { // wake up threads waiting for stuff m_cond.notify_all(); } } template bool SafeQueue::tryPush(const T &a_elem) { std::lock_guard lk(m_mutex); bool rv = false; bool queueEmpty = m_theQueue.empty(); if (m_theQueue.size() < m_maximumSize) { m_theQueue.push(a_elem); rv = true; } if (queueEmpty) { // wake up threads waiting for stuff m_cond.notify_all(); } return rv; } template void SafeQueue::pop(T &out_data) { std::unique_lock lk(m_mutex); while (m_theQueue.empty()) { m_cond.wait(lk); } bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false; out_data = m_theQueue.front(); m_theQueue.pop(); if (queueFull) { // wake up threads waiting for stuff m_cond.notify_all(); } } template bool SafeQueue::tryPop(T &out_data) { std::lock_guard lk(m_mutex); bool rv = false; if (!m_theQueue.empty()) { bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false; out_data = m_theQueue.front(); m_theQueue.pop(); if (queueFull) { // wake up threads waiting for stuff m_cond.notify_all(); } rv = true; } return rv; } template bool SafeQueue::timedWaitPop(T &data, std::chrono::microseconds a_microsecs) { std::unique_lock lk(m_mutex); auto wakeUpTime = std::chrono::steady_clock::now() + a_microsecs; if (m_cond.wait_until(lk, wakeUpTime, [this](){return (m_theQueue.size() > 0);})) { // wait_until returns false if the predicate (3rd parameter) still // evaluates to false after the rel_time timeout expired // we are in this side of the if-clause because the queue is not empty // (so the 3rd parameter evaluated to true) bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false; data = m_theQueue.front(); m_theQueue.pop(); if (queueFull) { // wake up threads waiting to insert things into the queue. // The queue used to be full, now it's not. m_cond.notify_all(); } return true; } else { // timed-out and the queue is still empty return false; } } #endif /* _SAFEQUEUEIMPL_H_ */