liuxiaolong
2021-07-20 58d904a328c0d849769b483e901a0be9426b8209
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
//  (C) Copyright 2008-10 Anthony Williams
//  (C) Copyright 2011-2015 Vicente J. Botet Escriba
//
//  Distributed under the Boost Software License, Version 1.0. (See
//  accompanying file LICENSE_1_0.txt or copy at
//  http://www.boost.org/LICENSE_1_0.txt)
 
#ifndef BOOST_THREAD_FUTURES_WAIT_FOR_ANY_HPP
#define BOOST_THREAD_FUTURES_WAIT_FOR_ANY_HPP
 
#include <boost/thread/detail/config.hpp>
 
#include <boost/thread/detail/move.hpp>
#include <boost/thread/futures/is_future_type.hpp>
#include <boost/thread/lock_algorithms.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
 
#include <boost/core/enable_if.hpp>
#include <boost/next_prior.hpp>
#include <boost/scoped_array.hpp>
 
#include <iterator>
#include <vector>
 
namespace boost
{
  namespace detail
  {
    template <class Future>
    class waiter_for_any_in_seq
    {
      struct registered_waiter;
      typedef std::vector<int>::size_type count_type;
 
      struct registered_waiter
      {
        typedef Future future_type;
        future_type* future_;
        typedef typename Future::notify_when_ready_handle notify_when_ready_handle;
        notify_when_ready_handle handle;
        count_type index;
 
        registered_waiter(future_type & a_future,
            notify_when_ready_handle handle_, count_type index_) :
          future_(&a_future), handle(handle_), index(index_)
        {
        }
      };
 
      struct all_futures_lock
      {
#ifdef _MANAGED
        typedef std::ptrdiff_t count_type_portable;
#else
        typedef count_type count_type_portable;
#endif
        count_type_portable count;
        boost::scoped_array<boost::unique_lock<boost::mutex> > locks;
 
        all_futures_lock(std::vector<registered_waiter>& waiters) :
          count(waiters.size()), locks(new boost::unique_lock<boost::mutex>[count])
        {
          for (count_type_portable i = 0; i < count; ++i)
          {
            locks[i] = BOOST_THREAD_MAKE_RV_REF(boost::unique_lock<boost::mutex>(waiters[i].future_->mutex()));
          }
        }
 
        void lock()
        {
          boost::lock(locks.get(), locks.get() + count);
        }
 
        void unlock()
        {
          for (count_type_portable i = 0; i < count; ++i)
          {
            locks[i].unlock();
          }
        }
      };
 
      boost::condition_variable_any cv;
      std::vector<registered_waiter> waiters_;
      count_type future_count;
 
    public:
      waiter_for_any_in_seq() :
        future_count(0)
      {
      }
 
      template <typename F>
      void add(F& f)
      {
        if (f.valid())
        {
          registered_waiter waiter(f, f.notify_when_ready(cv), future_count);
          try
          {
            waiters_.push_back(waiter);
          }
          catch (...)
          {
            f.future_->unnotify_when_ready(waiter.handle);
            throw;
          }
          ++future_count;
        }
      }
 
#ifndef BOOST_NO_CXX11_VARIADIC_TEMPLATES
      template <typename F1, typename ... Fs>
      void add(F1& f1, Fs&... fs)
      {
        add(f1);
        add(fs...);
      }
#endif
 
      count_type wait()
      {
        all_futures_lock lk(waiters_);
        for (;;)
        {
          for (count_type i = 0; i < waiters_.size(); ++i)
          {
            if (waiters_[i].future_->is_ready(lk.locks[i]))
            {
              return waiters_[i].index;
            }
          }
          cv.wait(lk);
        }
      }
 
      ~waiter_for_any_in_seq()
      {
        for (count_type i = 0; i < waiters_.size(); ++i)
        {
          waiters_[i].future_->unnotify_when_ready(waiters_[i].handle);
        }
      }
    };
  }
 
  template <typename Iterator>
  typename boost::disable_if<is_future_type<Iterator> , Iterator>::type wait_for_any(Iterator begin, Iterator end)
  {
    if (begin == end) return end;
 
    detail::waiter_for_any_in_seq<typename std::iterator_traits<Iterator>::value_type> waiter;
    for (Iterator current = begin; current != end; ++current)
    {
      waiter.add(*current);
    }
    return boost::next(begin, waiter.wait());
  }
}
 
#endif // header