#ifndef CAFFE2_UTILS_SIMPLE_QUEUE_H_
|
#define CAFFE2_UTILS_SIMPLE_QUEUE_H_
|
|
#include <condition_variable> // NOLINT
|
#include <mutex> // NOLINT
|
#include <queue>
|
|
#include "caffe2/core/logging.h"
|
|
namespace caffe2 {
|
|
// This is a very simple queue that Yangqing wrote when bottlefeeding the baby,
|
// so don't take it seriously. What it does is a minimal thread-safe queue that
|
// allows me to run network as a DAG.
|
//
|
// A usual work pattern looks like this: one or multiple producers push jobs
|
// into this queue, and one or multiple workers pops jobs from this queue. If
|
// nothing is in the queue but NoMoreJobs() is not called yet, the pop calls
|
// will wait. If NoMoreJobs() has been called, pop calls will return false,
|
// which serves as a message to the workers that they should exit.
|
template <typename T>
|
class SimpleQueue {
|
public:
|
SimpleQueue() : no_more_jobs_(false) {}
|
|
// Pops a value and writes it to the value pointer. If there is nothing in the
|
// queue, this will wait till a value is inserted to the queue. If there are
|
// no more jobs to pop, the function returns false. Otherwise, it returns
|
// true.
|
bool Pop(T* value) {
|
std::unique_lock<std::mutex> mutex_lock(mutex_);
|
while (queue_.size() == 0 && !no_more_jobs_) cv_.wait(mutex_lock);
|
if (queue_.size() == 0 && no_more_jobs_) return false;
|
*value = queue_.front();
|
queue_.pop();
|
return true;
|
}
|
|
int size() {
|
std::unique_lock<std::mutex> mutex_lock(mutex_);
|
return queue_.size();
|
}
|
|
// Push pushes a value to the queue.
|
void Push(const T& value) {
|
{
|
std::lock_guard<std::mutex> mutex_lock(mutex_);
|
CAFFE_ENFORCE(!no_more_jobs_, "Cannot push to a closed queue.");
|
queue_.push(value);
|
}
|
cv_.notify_one();
|
}
|
|
// NoMoreJobs() marks the close of this queue. It also notifies all waiting
|
// Pop() calls so that they either check out remaining jobs, or return false.
|
// After NoMoreJobs() is called, this queue is considered closed - no more
|
// Push() functions are allowed, and once existing items are all checked out
|
// by the Pop() functions, any more Pop() function will immediately return
|
// false with nothing set to the value.
|
void NoMoreJobs() {
|
{
|
std::lock_guard<std::mutex> mutex_lock(mutex_);
|
no_more_jobs_ = true;
|
}
|
cv_.notify_all();
|
}
|
|
private:
|
std::mutex mutex_;
|
std::condition_variable cv_;
|
std::queue<T> queue_;
|
bool no_more_jobs_{};
|
// We do not allow copy constructors.
|
SimpleQueue(const SimpleQueue& /*src*/) {}
|
};
|
|
} // namespace caffe2
|
|
#endif // CAFFE2_UTILS_SIMPLE_QUEUE_H_
|