#ifndef __ARRAY_LOCK_FREE_SEM_QUEUE_H__
|
#define __ARRAY_LOCK_FREE_SEM_QUEUE_H__
|
#include "atomic_ops.h"
|
#include <assert.h> // assert()
|
#include <sched.h> // sched_yield()
|
#include "logger_factory.h"
|
#include "mem_pool.h"
|
#include "shm_allocator.h"
|
#include "futex_sem.h"
|
#include "time_util.h"
|
|
|
/// @brief implementation of an array based lock free queue with support for
|
/// multiple producers
|
/// This class is prevented from being instantiated directly (all members and
|
/// methods are private). To instantiate a multiple producers lock free queue
|
/// you must use the ArrayLockFreeSemQueue fachade:
|
/// ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q;
|
|
#define LOCK_FREE_QUEUE_TIMEOUT 1
|
#define LOCK_FREE_QUEUE_NOWAIT 1 << 1
|
|
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
|
|
|
template <typename ELEM_T, typename Allocator = SHM_Allocator>
|
class ArrayLockFreeSemQueue
|
{
|
public:
|
/// @brief constructor of the class
|
ArrayLockFreeSemQueue(size_t qsize = 16);
|
|
virtual ~ArrayLockFreeSemQueue();
|
|
inline uint32_t size();
|
|
inline bool full();
|
|
inline bool empty();
|
|
int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
|
|
int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
|
|
/// @brief calculate the index in the circular array that corresponds
|
/// to a particular "count" value
|
inline uint32_t countToIndex(uint32_t a_count);
|
|
ELEM_T& operator[](unsigned i);
|
|
public:
|
void *operator new(size_t size);
|
void operator delete(void *p);
|
|
private:
|
size_t Q_SIZE;
|
/// @brief array to keep the elements
|
ELEM_T *m_theQueue;
|
|
/// @brief where a new element will be inserted
|
uint32_t m_writeIndex;
|
|
/// @brief where the next element where be extracted from
|
uint32_t m_readIndex;
|
|
/// @brief maximum read index for multiple producer queues
|
/// If it's not the same as m_writeIndex it means
|
/// there are writes pending to be "committed" to the queue, that means,
|
/// the place for the data was reserved (the index in the array) but
|
/// data is still not in the queue, so the thread trying to read will have
|
/// to wait for those other threads to save the data into the queue
|
///
|
/// note this is only used for multiple producers
|
uint32_t m_maximumReadIndex;
|
|
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
|
/// @brief number of elements in the queue
|
int m_count;
|
#endif
|
|
|
private:
|
/// @brief disable copy constructor declaring it private
|
ArrayLockFreeSemQueue<ELEM_T, Allocator>(const ArrayLockFreeSemQueue<ELEM_T> &a_src);
|
|
};
|
|
|
template <typename ELEM_T, typename Allocator>
|
ArrayLockFreeSemQueue<ELEM_T, Allocator>::ArrayLockFreeSemQueue(size_t qsize):
|
Q_SIZE(qsize),
|
m_writeIndex(0), // initialisation is not atomic
|
m_readIndex(0), //
|
m_maximumReadIndex(0) //
|
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
|
,m_count(0) //
|
#endif
|
{
|
m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
|
|
}
|
|
template <typename ELEM_T, typename Allocator>
|
ArrayLockFreeSemQueue<ELEM_T, Allocator>::~ArrayLockFreeSemQueue()
|
{
|
// std::cout << "destroy ArrayLockFreeSemQueue\n";
|
Allocator::deallocate(m_theQueue);
|
|
}
|
|
template <typename ELEM_T, typename Allocator>
|
inline
|
uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
|
{
|
// if Q_SIZE is a power of 2 this statement could be also written as
|
// return (a_count & (Q_SIZE - 1));
|
return (a_count % Q_SIZE);
|
}
|
|
template <typename ELEM_T, typename Allocator>
|
inline
|
uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::size()
|
{
|
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
|
|
return m_count;
|
#else
|
|
uint32_t currentWriteIndex = m_maximumReadIndex;
|
uint32_t currentReadIndex = m_readIndex;
|
|
// let's think of a scenario where this function returns bogus data
|
// 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
|
// m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
|
// 2. afterwards this thread is preemted. While this thread is inactive 2
|
// elements are inserted and removed from the queue, so m_maximumReadIndex
|
// is 5 and m_readIndex 4. Real size is still 1
|
// 3. Now the current thread comes back from preemption and reads m_readIndex.
|
// currentReadIndex is 4
|
// 4. currentReadIndex is bigger than currentWriteIndex, so
|
// m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
|
// it returns that the queue is almost full, when it is almost empty
|
//
|
if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
|
{
|
return (currentWriteIndex - currentReadIndex);
|
}
|
else
|
{
|
return (Q_SIZE + currentWriteIndex - currentReadIndex);
|
}
|
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
|
}
|
|
template <typename ELEM_T, typename Allocator>
|
inline
|
bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::full()
|
{
|
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
|
|
return (m_count == (Q_SIZE));
|
#else
|
|
uint32_t currentWriteIndex = m_writeIndex;
|
uint32_t currentReadIndex = m_readIndex;
|
|
if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
|
{
|
// the queue is full
|
return true;
|
}
|
else
|
{
|
// not full!
|
return false;
|
}
|
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
|
}
|
|
template <typename ELEM_T, typename Allocator>
|
inline
|
bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::empty()
|
{
|
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
|
|
return (m_count == 0);
|
#else
|
|
if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
|
{
|
// the queue is full
|
return true;
|
}
|
else
|
{
|
// not full!
|
return false;
|
}
|
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
|
}
|
|
|
|
|
|
|
template <typename ELEM_T, typename Allocator>
|
int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag)
|
{
|
uint32_t currentReadIndex;
|
uint32_t currentWriteIndex;
|
int s;
|
|
do
|
{
|
currentWriteIndex = m_writeIndex;
|
currentReadIndex = m_readIndex;
|
|
if (m_count == Q_SIZE) {
|
if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
|
return -1;
|
else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
|
const struct timespec ts = TimeUtil::trim_time(timeout);
|
s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
|
if (s == -1 && errno != EAGAIN && errno != EINTR) {
|
// err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
|
return -1;
|
}
|
|
} else {
|
s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
|
if (s == -1 && errno != EAGAIN && errno != EINTR) {
|
return -1;
|
}
|
}
|
|
}
|
|
|
} while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
|
|
// We know now that this index is reserved for us. Use it to save the data
|
m_theQueue[countToIndex(currentWriteIndex)] = a_data;
|
|
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
|
// inserting in the queue. It might fail if there are more than 1 producer threads because this
|
// operation has to be done in the same order as the previous CAS
|
|
while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
|
{
|
// this is a good place to yield the thread in case there are more
|
// software threads than hardware processors and you have more
|
// than 1 producer thread
|
// have a look at sched_yield (POSIX.1b)
|
sched_yield();
|
}
|
|
AtomicAdd(&m_count, 1);
|
s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
|
if (s == -1)
|
err_exit(errno, "futex-FUTEX_WAKE");
|
return 0;
|
}
|
|
|
template <typename ELEM_T, typename Allocator>
|
int ArrayLockFreeSemQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag)
|
{
|
uint32_t currentMaximumReadIndex;
|
uint32_t currentReadIndex;
|
|
int s;
|
do
|
{
|
// to ensure thread-safety when there is more than 1 producer thread
|
// a second index is defined (m_maximumReadIndex)
|
currentReadIndex = m_readIndex;
|
currentMaximumReadIndex = m_maximumReadIndex;
|
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
|
|
if (m_count == 0) {
|
|
if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
|
return -1;
|
else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
|
const struct timespec ts = TimeUtil::trim_time(timeout);
|
s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
|
if (s == -1 && errno != EAGAIN && errno != EINTR) {
|
// err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
|
return -1;
|
}
|
|
} else {
|
s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
|
if (s == -1 && errno != EAGAIN && errno != EINTR) {
|
return -1;
|
}
|
}
|
}
|
#else
|
if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
|
{
|
// the queue is empty or
|
// a producer thread has allocate space in the queue but is
|
// waiting to commit the data into it
|
return -1;
|
}
|
#endif
|
|
// retrieve the data from the queue
|
a_data = m_theQueue[countToIndex(currentReadIndex)];
|
|
// try to perfrom now the CAS operation on the read index. If we succeed
|
// a_data already contains what m_readIndex pointed to before we
|
// increased it
|
if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
|
{
|
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
|
// m_count.fetch_sub(1);
|
AtomicSub(&m_count, 1);
|
#endif
|
|
s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
|
if (s == -1)
|
err_exit(errno, "futex-FUTEX_WAKE");
|
return 0;
|
}
|
|
// it failed retrieving the element off the queue. Someone else must
|
// have read the element stored at countToIndex(currentReadIndex)
|
// before we could perform the CAS operation
|
|
} while(1); // keep looping to try again!
|
|
// Something went wrong. it shouldn't be possible to reach here
|
assert(0);
|
|
// Add this return statement to avoid compiler warnings
|
return -1;
|
}
|
|
template <typename ELEM_T, typename Allocator>
|
ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i)
|
{
|
int currentCount = m_count;
|
uint32_t currentReadIndex = m_readIndex;
|
if (i >= currentCount)
|
{
|
std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
|
std::exit(EXIT_FAILURE);
|
}
|
return m_theQueue[countToIndex(currentReadIndex+i)];
|
}
|
|
|
|
template <typename ELEM_T, typename Allocator>
|
void * ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator new(size_t size){
|
return Allocator::allocate(size);
|
}
|
|
template <typename ELEM_T, typename Allocator>
|
void ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator delete(void *p) {
|
return Allocator::deallocate(p);
|
}
|
|
#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
|