lichao
2021-04-12 1b52f1cb8c47dd2c0195d2fd65d7b6a4c2f10704
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/*
 * =====================================================================================
 *
 *       Filename:  failed_msg.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年04月12日 09时36分04秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:  
 *
 * =====================================================================================
 */
#ifndef TIMED_QUEUE_Y2YLRBS3
#define TIMED_QUEUE_Y2YLRBS3
 
#include "bh_util.h"
#include <chrono>
#include <list>
#include <string>
 
template <class Data, class ClockType = std::chrono::steady_clock>
class TimedQueue
{
public:
    typedef ClockType Clock;
    typedef typename Clock::time_point TimePoint;
    typedef typename Clock::duration Duration;
 
private:
    struct Record {
        TimePoint expire_;
        Data data_;
        Record(const TimePoint &expire, const Data &data) :
            expire_(expire), data_(data) {}
        Record(const TimePoint &expire, Data &&data) :
            expire_(expire), data_(std::move(data)) {}
        bool Expired() { return Clock::now() > expire_; }
    };
    typedef std::list<Record> Queue;
    Synced<Queue> queue_;
 
public:
    void Push(Data &&data, const TimePoint &expire) { queue_->emplace_back(expire, std::move(data)); }
    void Push(Data const &data, const TimePoint &expire) { queue_->emplace_back(expire, data); }
 
    void Push(Data &&data, Duration const &timeout) { Push(std::move(data), Clock::now() + timeout); }
    void Push(Data const &data, Duration const &timeout) { Push(data, Clock::now() + timeout); }
 
    template <class Func>
    void CheckAll(Func const &func)
    {
        queue_.Apply([&](Queue &q) {
            if (q.empty()) {
                return;
            }
            auto it = q.begin();
            do {
                if (it->Expired()) {
                    it = q.erase(it);
                } else if (func(it->data_)) {
                    it = q.erase(it);
                } else {
                    ++it;
                }
            } while (it != q.end());
        });
    }
};
 
#endif // end of include guard: TIMED_QUEUE_Y2YLRBS3