#ifndef CAFFE2_UTILS_SIMPLE_QUEUE_H_ #define CAFFE2_UTILS_SIMPLE_QUEUE_H_ #include // NOLINT #include // NOLINT #include #include "caffe2/core/logging.h" namespace caffe2 { // This is a very simple queue that Yangqing wrote when bottlefeeding the baby, // so don't take it seriously. What it does is a minimal thread-safe queue that // allows me to run network as a DAG. // // A usual work pattern looks like this: one or multiple producers push jobs // into this queue, and one or multiple workers pops jobs from this queue. If // nothing is in the queue but NoMoreJobs() is not called yet, the pop calls // will wait. If NoMoreJobs() has been called, pop calls will return false, // which serves as a message to the workers that they should exit. template class SimpleQueue { public: SimpleQueue() : no_more_jobs_(false) {} // Pops a value and writes it to the value pointer. If there is nothing in the // queue, this will wait till a value is inserted to the queue. If there are // no more jobs to pop, the function returns false. Otherwise, it returns // true. bool Pop(T* value) { std::unique_lock mutex_lock(mutex_); while (queue_.size() == 0 && !no_more_jobs_) cv_.wait(mutex_lock); if (queue_.size() == 0 && no_more_jobs_) return false; *value = queue_.front(); queue_.pop(); return true; } int size() { std::unique_lock mutex_lock(mutex_); return queue_.size(); } // Push pushes a value to the queue. void Push(const T& value) { { std::lock_guard mutex_lock(mutex_); CAFFE_ENFORCE(!no_more_jobs_, "Cannot push to a closed queue."); queue_.push(value); } cv_.notify_one(); } // NoMoreJobs() marks the close of this queue. It also notifies all waiting // Pop() calls so that they either check out remaining jobs, or return false. // After NoMoreJobs() is called, this queue is considered closed - no more // Push() functions are allowed, and once existing items are all checked out // by the Pop() functions, any more Pop() function will immediately return // false with nothing set to the value. void NoMoreJobs() { { std::lock_guard mutex_lock(mutex_); no_more_jobs_ = true; } cv_.notify_all(); } private: std::mutex mutex_; std::condition_variable cv_; std::queue queue_; bool no_more_jobs_{}; // We do not allow copy constructors. SimpleQueue(const SimpleQueue& /*src*/) {} }; } // namespace caffe2 #endif // CAFFE2_UTILS_SIMPLE_QUEUE_H_