#pragma once #include #include #include #include #include #include "caffe2/core/blob_stats.h" #include "caffe2/core/logging.h" #include "caffe2/core/stats.h" #include "caffe2/core/tensor.h" #include "caffe2/core/workspace.h" namespace caffe2 { // A thread-safe, bounded, blocking queue. // Modelled as a circular buffer. // Containing blobs are owned by the workspace. // On read, we swap out the underlying data for the blob passed in for blobs class CAFFE2_API BlobsQueue : public std::enable_shared_from_this { public: BlobsQueue( Workspace* ws, const std::string& queueName, size_t capacity, size_t numBlobs, bool enforceUniqueName, const std::vector& fieldNames = {}); ~BlobsQueue() { close(); } bool blockingRead( const std::vector& inputs, float timeout_secs = 0.0f); bool tryWrite(const std::vector& inputs); bool blockingWrite(const std::vector& inputs); void close(); size_t getNumBlobs() const { return numBlobs_; } private: bool canWrite(); void doWrite(const std::vector& inputs); std::atomic closing_{false}; size_t numBlobs_; std::mutex mutex_; // protects all variables in the class. std::condition_variable cv_; int64_t reader_{0}; int64_t writer_{0}; std::vector> queue_; const std::string name_; struct QueueStats { CAFFE_STAT_CTOR(QueueStats); CAFFE_EXPORTED_STAT(queue_balance); CAFFE_EXPORTED_STAT(queue_dequeued_records); CAFFE_DETAILED_EXPORTED_STAT(queue_dequeued_bytes); CAFFE_AVG_EXPORTED_STAT(read_time_ns); CAFFE_AVG_EXPORTED_STAT(write_time_ns); } stats_; }; } // namespace caffe2