// queue.h -- interface for a queue
|
#ifndef SArrayLockFreeQueue_H_
|
#define SArrayLockFreeQueue_H_
|
#include "mm.h"
|
#include "pcsem.h"
|
#include "SAbstractQueue.h"
|
|
|
|
template <typename T>
|
class SArrayLockFreeQueue :public SAbstractQueue<T>
|
{
|
private:
|
// class scope definitions
|
|
T * arr;
|
enum {Q_SIZE = 16};
|
|
int slots;
|
int items;
|
// private class members
|
std::atomic_uint m_readIndex;
|
std::atomic_uint m_writeIndex;
|
std::atomic_uint m_maximumReadIndex;
|
|
std::atomic_uint count;
|
const size_t qsize; // maximum number of size in Queue
|
// preemptive definitions to prevent public copying
|
SArrayLockFreeQueue(const SArrayLockFreeQueue & q) : qsize(0) { }
|
SArrayLockFreeQueue & operator=(const SArrayLockFreeQueue & q) { return *this;}
|
bool _add(const T &item); // add item to end
|
bool _remove(T &item); // remove item from front
|
inline unsigned int countToIndex(unsigned int count) const;
|
public:
|
SArrayLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit
|
virtual ~SArrayLockFreeQueue();
|
inline bool isempty() const;
|
inline bool isfull() const;
|
virtual unsigned int size() const;
|
virtual bool add(const T &item); // add item to end
|
virtual bool add_nowait(const T &item);
|
virtual bool add_timeout(const T &item, struct timespec *timeout);
|
virtual bool remove(T &item);
|
virtual bool remove_nowait(T &item);
|
virtual bool remove_timeout(T &item, struct timespec * timeout);
|
|
|
virtual T& operator[](unsigned i);
|
};
|
|
|
|
|
// Queue methods
|
template <typename T>
|
SArrayLockFreeQueue<T>::SArrayLockFreeQueue(size_t qs) : m_readIndex(0), m_writeIndex(0), m_maximumReadIndex(0), count(0), qsize(qs)
|
{
|
|
arr = (T*)mm_malloc(qsize * sizeof(T));
|
slots = pcsem::init(IPC_PRIVATE, qsize);
|
items = pcsem::init(IPC_PRIVATE, 0);
|
|
}
|
|
template <typename T>
|
SArrayLockFreeQueue<T>::~SArrayLockFreeQueue()
|
{
|
std::cerr << "SArrayLockFreeQueue destory" << std::endl;
|
pcsem::remove(slots);
|
pcsem::remove(items);
|
|
mm_free(arr);
|
|
}
|
|
template <typename T>
|
inline bool SArrayLockFreeQueue<T>::isempty() const
|
{
|
return countToIndex(m_readIndex.load(std::memory_order_relaxed)) == countToIndex( m_maximumReadIndex.load(std::memory_order_relaxed));
|
}
|
|
template <typename T>
|
inline bool SArrayLockFreeQueue<T>::isfull() const
|
{
|
return countToIndex(m_maximumReadIndex.load(std::memory_order_relaxed) + 1) == countToIndex(m_readIndex.load(std::memory_order_relaxed));
|
//return count == qsize;
|
}
|
|
template <typename T>
|
unsigned int SArrayLockFreeQueue<T>::size() const
|
{
|
return count;
|
}
|
|
// Add item to queue
|
template <typename T>
|
bool SArrayLockFreeQueue<T>::_add(const T & item)
|
{
|
|
unsigned int currentReadIndex;
|
unsigned int currentWriteIndex;
|
|
do
|
{
|
currentWriteIndex = m_writeIndex.load(std::memory_order_relaxed);
|
currentReadIndex = m_readIndex.load(std::memory_order_relaxed);
|
// if (countToIndex(currentWriteIndex + 1 ) == countToIndex(currentReadIndex))
|
if (isfull())
|
{
|
// the queue is full
|
return false;
|
}
|
|
} while (!m_writeIndex.compare_exchange_weak(currentWriteIndex, (currentWriteIndex + 1), std::memory_order_release, std::memory_order_relaxed));
|
|
// We know now that this index is reserved for us. Use it to save the data
|
std::cerr << "add " << count.load( std::memory_order_relaxed) << std::endl;
|
arr[countToIndex(currentWriteIndex)] = item;
|
count++;
|
|
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
|
// inserting in the queue. It might fail if there are more than 1 producer threads because this
|
// operation has to be done in the same order as the previous CAS
|
while (!m_maximumReadIndex.compare_exchange_weak(currentWriteIndex, (currentWriteIndex + 1), std::memory_order_release, std::memory_order_relaxed))
|
{
|
// this is a good place to yield the thread in case there are more
|
// software threads than hardware processors and you have more
|
// than 1 producer thread
|
// have a look at sched_yield (POSIX.1b)
|
sched_yield();
|
}
|
|
return true;
|
}
|
|
template <typename T>
|
bool SArrayLockFreeQueue<T>::add(const T & item)
|
{
|
if (pcsem::dec(slots) == -1) {
|
err_exit(errno, "add");
|
}
|
|
if (SArrayLockFreeQueue<T>::_add(item)) {
|
pcsem::inc(items);
|
return true;
|
}
|
return false;
|
|
}
|
|
template <typename T>
|
bool SArrayLockFreeQueue<T>::add_nowait(const T & item)
|
{
|
if (pcsem::dec_nowait(slots) == -1) {
|
if (errno == EAGAIN)
|
return false;
|
else
|
err_exit(errno, "add_nowait");
|
}
|
|
if (SArrayLockFreeQueue<T>::_add(item)) {
|
pcsem::inc(items);
|
return true;
|
}
|
return false;
|
|
}
|
|
template <typename T>
|
bool SArrayLockFreeQueue<T>::add_timeout(const T & item, struct timespec * timeout)
|
{
|
if (pcsem::dec_timeout(slots, timeout) == -1) {
|
if (errno == EAGAIN)
|
return false;
|
else
|
err_exit(errno, "add_timeout");
|
}
|
|
if (SArrayLockFreeQueue<T>::_add(item)){
|
pcsem::inc(items);
|
return true;
|
}
|
return false;
|
|
}
|
|
|
// Place front item into item variable and remove from queue
|
template <typename T>
|
bool SArrayLockFreeQueue<T>::_remove(T & item)
|
{
|
unsigned int currentMaximumReadIndex;
|
unsigned int currentReadIndex;
|
|
do
|
{
|
// to ensure thread-safety when there is more than 1 producer thread
|
// a second index is defined (m_maximumReadIndex)
|
currentReadIndex = m_readIndex.load(std::memory_order_relaxed);
|
currentMaximumReadIndex = m_maximumReadIndex.load(std::memory_order_relaxed);
|
|
if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
|
{
|
// the queue is empty or
|
// a producer thread has allocate space in the queue but is
|
// waiting to commit the data into it
|
return false;
|
}
|
|
// retrieve the data from the queue
|
item = arr[countToIndex(currentReadIndex)];
|
|
// try to perfrom now the CAS operation on the read index. If we succeed
|
// a_data already contains what m_readIndex pointed to before we
|
// increased it
|
if (m_readIndex.compare_exchange_weak(currentReadIndex, (currentReadIndex + 1), std::memory_order_release, std::memory_order_relaxed))
|
{
|
count--;
|
return true;
|
}
|
|
// it failed retrieving the element off the queue. Someone else must
|
// have read the element stored at countToIndex(currentReadIndex)
|
// before we could perform the CAS operation
|
|
} while(1); // keep looping to try again!
|
|
// Something went wrong. it shouldn't be possible to reach here
|
//assert(0);
|
|
// Add this return statement to avoid compiler warnings
|
return false;
|
}
|
|
template <typename T>
|
bool SArrayLockFreeQueue<T>::remove(T & item)
|
{
|
if (pcsem::dec(items) == -1) {
|
err_exit(errno, "remove");
|
}
|
|
if (SArrayLockFreeQueue<T>::_remove(item)) {
|
pcsem::inc(slots);
|
return true;
|
}
|
return false;
|
|
}
|
|
template <typename T>
|
bool SArrayLockFreeQueue<T>::remove_nowait(T & item)
|
{
|
if (pcsem::dec_nowait(items) == -1) {
|
if (errno == EAGAIN)
|
return false;
|
else
|
err_exit(errno, "remove_nowait");
|
}
|
|
if (SArrayLockFreeQueue<T>::_remove(item)) {
|
pcsem::inc(slots);
|
return true;
|
}
|
return false;
|
|
}
|
|
template <typename T>
|
bool SArrayLockFreeQueue<T>::remove_timeout(T & item, struct timespec * timeout)
|
{
|
if (pcsem::dec_timeout(items, timeout) == -1) {
|
if (errno == EAGAIN)
|
return false;
|
else
|
err_exit(errno, "remove_timeout");
|
}
|
|
if (SArrayLockFreeQueue<T>::_remove(item)) {
|
pcsem::inc(slots);
|
return true;
|
}
|
return false;
|
|
}
|
|
template <class T>
|
inline unsigned int SArrayLockFreeQueue<T>::countToIndex(unsigned int _count) const{
|
return _count % qsize;
|
}
|
|
template <class T>
|
T& SArrayLockFreeQueue<T>::operator[](unsigned int i)
|
{
|
if (i < 0 || i >= count)
|
{
|
std::cerr << "SArrayLockFreeQueue operator[] ,Error in array limits: " << i << " is out of range\n";
|
std::exit(EXIT_FAILURE);
|
}
|
|
|
return arr[countToIndex( m_readIndex.load(std::memory_order_relaxed) + i)];
|
}
|
|
|
#endif
|