wangzhengquan
2021-01-14 a1b6b9ae58c85ec1801935676c9dd76682a313b1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
#ifndef __LOCK_FREE_QUEUE_H__
#define __LOCK_FREE_QUEUE_H__
 
#include <usg_common.h>
#include <assert.h> // assert()
#include "mem_pool.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "shm_allocator.h"
#include "px_sem_util.h"
#include "bus_error.h"
 
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
 
// static Logger *logger = LoggerFactory::getLogger();
// define this macro if calls to "size" must return the real size of the 
// queue. If it is undefined  that function will try to take a snapshot of 
// the queue, but returned value might be bogus
 
 
// forward declarations for default template values
//
 
template <typename ELEM_T, typename Allocator>
class ArrayLockFreeQueue;
 
// template <typename ELEM_T>
// class LinkedLockFreeQueue;
 
 
/// @brief Lock-free queue based on a circular array
/// No allocation of extra memory for the nodes handling is needed, but it has 
/// to add extra overhead (extra CAS operation) when inserting to ensure the 
/// thread-safety of the queue when the queue type is not 
/// ArrayLockFreeQueueSingleProducer.
///
/// examples of instantiation:
///   ArrayLockFreeQueue<int> q; // queue of ints of default size (65535 - 1)
///                              // and defaulted to single producer
///   ArrayLockFreeQueue<int, 16> q;
///                              // queue of ints of size (16 - 1) and
///                              // defaulted to single producer
///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
///                              // queue of ints of size (100 - 1) with support
///                              // for multiple producers
///
/// ELEM_T represents the type of elementes pushed and popped from the queue
/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
///        This number should be a power of 2 to ensure 
///        indexes in the circular queue keep stable when the uint32_t 
///        variable that holds the current position rolls over from FFFFFFFF
///        to 0. For instance
///        2    -> 0x02 
///        4    -> 0x04
///        8    -> 0x08
///        16   -> 0x10
///        (...) 
///        1024 -> 0x400
///        2048 -> 0x800
///
///        if queue size is not defined as requested, let's say, for
///        instance 100, when current position is FFFFFFFF (4,294,967,295)
///        index in the circular array is 4,294,967,295 % 100 = 95. 
///        When that value is incremented it will be set to 0, that is the 
///        last 4 elements of the queue are not used when the counter rolls
///        over to 0
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and 
///        ArrayLockFreeQueue are supported (single producer
///        by default)
template <
    typename ELEM_T, 
    typename Allocator = SHM_Allocator,
    template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
     >
class LockFreeQueue
{
 
private:
    sem_t slots;  
    sem_t items;
 
 
   
public:
    sem_t mutex;
    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    
    /// @brief destructor of the class. 
    /// Note it is not virtual since it is not expected to inherit from this
    /// template
    ~LockFreeQueue();
    std::atomic_uint reference;    
    /// @brief constructor of the class
   
 
    /// @brief returns the current number of items in the queue
    /// It tries to take a snapshot of the size of the queue, but in busy environments
    /// this function might return bogus values. 
    ///
    /// If a reliable queue size must be kept you might want to have a look at 
    /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
    /// it enables a reliable size though it hits overall performance of the queue 
    /// (when the reliable size variable is on it's got an impact of about 20% in time)
    inline uint32_t size();
    
    /// @brief return true if the queue is full. False otherwise
    /// It tries to take a snapshot of the size of the queue, but in busy 
    /// environments this function might return bogus values. See help in method
    /// LockFreeQueue::size
    inline bool full();
 
    inline bool empty();
 
    inline ELEM_T& operator[](unsigned i);
 
    /// @brief push an element at the tail of the queue
    /// @param the element to insert in the queue
    /// Note that the element is not a pointer or a reference, so if you are using large data
    /// structures to be inserted in the queue you should think of instantiate the template
    /// of the queue as a pointer to that large structure
    /// @return true if the element was inserted in the queue. False if the queue was full
    int push(const ELEM_T &a_data);
    int push_nowait(const ELEM_T &a_data);
    int push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
 
    /// @brief pop the element at the head of the queue
    /// @param a reference where the element in the head of the queue will be saved to
    /// Note that the a_data parameter might contain rubbish if the function returns false
    /// @return true if the element was successfully extracted from the queue. False if the queue was empty
    int pop(ELEM_T &a_data);
    int pop_nowait(ELEM_T &a_data);
    int pop_timeout(ELEM_T &a_data, struct timespec * timeout);
 
 
    void *operator new(size_t size);
    void operator delete(void *p);
 
protected:
    /// @brief the actual queue. methods are forwarded into the real 
    ///        implementation
    Q_TYPE<ELEM_T, Allocator> m_qImpl;
 
private:
    /// @brief disable copy constructor declaring it private
    LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
};
 
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
{
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
    if (sem_init(&slots, 1, qsize) == -1)
        err_exit(errno, "LockFreeQueue sem_init");
    if (sem_init(&items, 1, 0) == -1)
        err_exit(errno, "LockFreeQueue sem_init");
    if (sem_init(&mutex, 1, 1) == -1)
        err_exit(errno, "LockFreeQueue sem_init");
 
   
}
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
{
    // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
    if(sem_destroy(&slots) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
    }
    if(sem_destroy(&items) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
    }
    if(sem_destroy(&mutex) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
    }
}
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
{
    return m_qImpl.size();
}  
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
{
    return m_qImpl.full();
}
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
{
    return m_qImpl.empty();
}  
 
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");   
    if (sem_wait(&slots) == -1) {
        err_msg(errno, "LockFreeQueue push");
        return errno;
    }
    
    if ( m_qImpl.push(a_data) ) {
        sem_post(&items);   
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");   
        return 0;
    }
    return -1;
    
}
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (sem_trywait(&slots) == -1) {
        if (errno == EAGAIN)
            return EAGAIN;
        else {
            err_msg(errno, "LockFreeQueue push_nowait");
            return errno;
        }
 
    }
 
    if ( m_qImpl.push(a_data)) {
        sem_post(&items);     
        return 0;
    }
    return -1;
    
}
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
{
     
    int rv;
    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
  // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld", 
  //   timeout.tv_sec, timeout.tv_nsec);
 
    while ( sem_timedwait(&slots, &timeout) == -1) {
    //     LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n", 
    // timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno);
 
        if(errno == ETIMEDOUT)
            return EBUS_TIMEOUT;
        else if(errno == EINTR)
            continue;
        else {
           LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
           return errno;
        }
    }
 
    if (m_qImpl.push(a_data)){
        sem_post(&items);   
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");    
        return 0;
    }
    return -1;
    
}
 
 
 
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
 
 // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if (sem_wait(&items) == -1) {
        LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop");
        return errno;
    }
 
    if (m_qImpl.pop(a_data)) {
        sem_post(&slots);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");      
        return 0;
    }
    return -1;
}
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (sem_trywait(&items) == -1) {
        if (errno == EAGAIN)
            return errno;
        else {
            LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait");
            return errno;
        }
    }
 
    if (m_qImpl.pop(a_data)) {
        sem_post(&slots);     
        return 0;
    }
    return -1;
}
 
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
// LoggerFactory::getLogger()->debug("=================ts sec = %d, nsec = %ld \n", ts->tv_sec,  ts->tv_nsec );
 
    LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");   
    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
// LoggerFactory::getLogger()->debug("================== timeout before sec = %d, nsec = %ld \n", timeout.tv_sec,  timeout.tv_nsec );
 
    while (sem_timedwait(&items, &timeout) == -1) {
        if (errno == ETIMEDOUT)
            return EBUS_TIMEOUT;
        else if(errno == EINTR)
            continue;
        else {
          // LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout %d", errno);
          return errno;
        }
    }
 
    if (m_qImpl.pop(a_data)) {
        sem_post(&slots);  
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");     
        return 0;
    }
    return -1;
    
}
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
    return m_qImpl.operator[](i);
}
 
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
        return Allocator::allocate(size);
}
 
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
    return Allocator::deallocate(p);
}
 
// include implementation files
//#include "linked_lock_free_queue.h"
#include "array_lock_free_queue.h"
 
#endif // _LOCK_FREE_QUEUE_H__