reid from https://github.com/michuanhaohao/reid-strong-baseline
zhangmeng
2020-01-16 a47fccb11fa3470901aebcb27f861d242d0925e1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#pragma once
 
#include <torch/types.h>
 
#include <c10/util/Exception.h>
 
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <mutex>
#include <queue>
 
namespace torch {
namespace data {
namespace detail {
 
/// A basic locked, blocking MPMC queue.
///
/// Every `push` and `pop` is guarded by a mutex. A condition variable is used
/// to communicate insertion of new elements, such that waiting threads will be
/// woken up if they are currently waiting inside a call to `pop()`.
///
/// Note that this data structure is written specifically for use with the
/// `DataLoader`. Its behavior is tailored to this use case and may not be
/// applicable to more general uses.
template <typename T>
class Queue {
 public:
  /// Pushes a new value to the back of the `Queue` and notifies one thread on
  /// the waiting side about this event.
  void push(T value) {
    {
      std::lock_guard<std::mutex> lock(mutex_);
      queue_.push(std::move(value));
    }
    cv_.notify_one();
  }
 
  /// Blocks until at least one element is ready to be popped from the front of
  /// the queue. An optional `timeout` in seconds can be used to limit the time
  /// spent waiting for an element. If the wait times out, an exception is
  /// raised.
  T pop(optional<std::chrono::milliseconds> timeout = nullopt) {
    std::unique_lock<std::mutex> lock(mutex_);
    if (timeout) {
      if (!cv_.wait_for(
              lock, *timeout, [this] { return !this->queue_.empty(); })) {
        // clang-format off
        AT_ERROR(
            "Timeout in DataLoader queue while waiting for next batch"
            " (timeout was ", timeout->count(), " ms)");
        // clang-format on
      }
    } else {
      cv_.wait(lock, [this] { return !this->queue_.empty(); });
    }
    AT_ASSERT(!queue_.empty());
    T value = queue_.front();
    queue_.pop();
    lock.unlock();
    return value;
  }
 
  /// Empties the queue and returns the number of elements that were present at
  /// the start of the function. No threads are notified about this event as it
  /// is assumed to be used to drain the queue during shutdown of a
  /// `DataLoader`.
  size_t clear() {
    std::lock_guard<std::mutex> lock(this->mutex_);
    const auto size = queue_.size();
    while (!queue_.empty()) {
      queue_.pop();
    }
    return size;
  }
 
 private:
  std::queue<T> queue_;
  std::mutex mutex_;
  std::condition_variable cv_;
};
} // namespace detail
} // namespace data
} // namespace torch