wangzhengquan
2020-07-06 302ae4427b04a25e4f1ee8acadbb05bf902f47f7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#ifndef _LOCK_FREE_QUEUE_H__
#define _LOCK_FREE_QUEUE_H__
 
#include <stdint.h>     // uint32_t
#include <atomic>
#include <usg_common.h>
 
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 65536 // (2^16)
 
// define this macro if calls to "size" must return the real size of the 
// queue. If it is undefined  that function will try to take a snapshot of 
// the queue, but returned value might be bogus
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
// forward declarations for default template values
//
 
template <typename ELEM_T>
class ArrayLockFreeQueueMultipleProducers;
 
 
/// @brief Lock-free queue based on a circular array
/// No allocation of extra memory for the nodes handling is needed, but it has 
/// to add extra overhead (extra CAS operation) when inserting to ensure the 
/// thread-safety of the queue when the queue type is not 
/// ArrayLockFreeQueueSingleProducer.
///
/// examples of instantiation:
///   ArrayLockFreeQueue<int> q; // queue of ints of default size (65535 - 1)
///                              // and defaulted to single producer
///   ArrayLockFreeQueue<int, 16> q;
///                              // queue of ints of size (16 - 1) and
///                              // defaulted to single producer
///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q;
///                              // queue of ints of size (100 - 1) with support
///                              // for multiple producers
///
/// ELEM_T represents the type of elementes pushed and popped from the queue
/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
///        This number should be a power of 2 to ensure 
///        indexes in the circular queue keep stable when the uint32_t 
///        variable that holds the current position rolls over from FFFFFFFF
///        to 0. For instance
///        2    -> 0x02 
///        4    -> 0x04
///        8    -> 0x08
///        16   -> 0x10
///        (...) 
///        1024 -> 0x400
///        2048 -> 0x800
///
///        if queue size is not defined as requested, let's say, for
///        instance 100, when current position is FFFFFFFF (4,294,967,295)
///        index in the circular array is 4,294,967,295 % 100 = 95. 
///        When that value is incremented it will be set to 0, that is the 
///        last 4 elements of the queue are not used when the counter rolls
///        over to 0
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and 
///        ArrayLockFreeQueueMultipleProducers are supported (single producer
///        by default)
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE = ArrayLockFreeQueueMultipleProducers >
class LockFreeQueue
{
public:    
    /// @brief constructor of the class
    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    
    /// @brief destructor of the class. 
    /// Note it is not virtual since it is not expected to inherit from this
    /// template
    ~LockFreeQueue();
 
    /// @brief returns the current number of items in the queue
    /// It tries to take a snapshot of the size of the queue, but in busy environments
    /// this function might return bogus values. 
    ///
    /// If a reliable queue size must be kept you might want to have a look at 
    /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
    /// it enables a reliable size though it hits overall performance of the queue 
    /// (when the reliable size variable is on it's got an impact of about 20% in time)
    inline uint32_t size();
    
    /// @brief return true if the queue is full. False otherwise
    /// It tries to take a snapshot of the size of the queue, but in busy 
    /// environments this function might return bogus values. See help in method
    /// LockFreeQueue::size
    inline bool full();
 
    /// @brief push an element at the tail of the queue
    /// @param the element to insert in the queue
    /// Note that the element is not a pointer or a reference, so if you are using large data
    /// structures to be inserted in the queue you should think of instantiate the template
    /// of the queue as a pointer to that large structure
    /// @return true if the element was inserted in the queue. False if the queue was full
    inline bool push(const ELEM_T &a_data);
 
    /// @brief pop the element at the head of the queue
    /// @param a reference where the element in the head of the queue will be saved to
    /// Note that the a_data parameter might contain rubbish if the function returns false
    /// @return true if the element was successfully extracted from the queue. False if the queue was empty
    inline bool pop(ELEM_T &a_data);
 
protected:
    /// @brief the actual queue. methods are forwarded into the real 
    ///        implementation
    Q_TYPE<ELEM_T> m_qImpl;
 
private:
    /// @brief disable copy constructor declaring it private
    LockFreeQueue<ELEM_T, Q_TYPE>(const LockFreeQueue<ELEM_T, Q_TYPE> &a_src);
};
 
 
 
/// @brief implementation of an array based lock free queue with support for 
///        multiple producers
/// This class is prevented from being instantiated directly (all members and
/// methods are private). To instantiate a multiple producers lock free queue 
/// you must use the ArrayLockFreeQueue fachade:
///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q;
template <typename ELEM_T>
class ArrayLockFreeQueueMultipleProducers
{
    // ArrayLockFreeQueue will be using this' private members
    template <
        typename ELEM_T_, 
        template <typename T> class Q_TYPE >
    friend class LockFreeQueue;
 
private:
    /// @brief constructor of the class
    ArrayLockFreeQueueMultipleProducers(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    
    virtual ~ArrayLockFreeQueueMultipleProducers();
    
    inline uint32_t size();
    
    inline bool full();
    
    inline bool empty();
    
    bool push(const ELEM_T &a_data);   
    
    bool pop(ELEM_T &a_data);
    
    /// @brief calculate the index in the circular array that corresponds
    /// to a particular "count" value
    inline uint32_t countToIndex(uint32_t a_count);
    
private:    
    size_t Q_SIZE;
    /// @brief array to keep the elements
    ELEM_T *m_theQueue;
 
    /// @brief where a new element will be inserted
    std::atomic<uint32_t> m_writeIndex;
 
    /// @brief where the next element where be extracted from
    std::atomic<uint32_t> m_readIndex;
    
    /// @brief maximum read index for multiple producer queues
    /// If it's not the same as m_writeIndex it means
    /// there are writes pending to be "committed" to the queue, that means,
    /// the place for the data was reserved (the index in the array) but  
    /// data is still not in the queue, so the thread trying to read will have 
    /// to wait for those other threads to save the data into the queue
    ///
    /// note this is only used for multiple producers
    std::atomic<uint32_t> m_maximumReadIndex;
 
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    /// @brief number of elements in the queue
    std::atomic<uint32_t> m_count;
#endif
    static int m_reference;
 
private:
    /// @brief disable copy constructor declaring it private
    ArrayLockFreeQueueMultipleProducers<ELEM_T>(const ArrayLockFreeQueueMultipleProducers<ELEM_T> &a_src);
 
 
};
 
// include implementation files
#include "lock_free_queue_impl.h"
#include "lock_free_queue_impl_multiple_producer.h"
 
#endif // _LOCK_FREE_QUEUE_H__