// queue.h -- interface for a queue
|
#ifndef SQUEUE_H_
|
#define SQUEUE_H_
|
#include "mm.h"
|
#include "pcsem.h"
|
|
|
template <typename T> class Node;
|
|
template <typename T>
|
class Pointer {
|
public:
|
|
Node<T> *ptr;
|
unsigned long count;
|
Pointer( Node<T> *node = NULL, int c=0) noexcept : ptr(node), count(c) {}
|
|
bool operator == (const Pointer<T> o) const {
|
return (o.ptr == ptr) && (o.count == count);
|
}
|
bool operator != (const Pointer<T> o) const {
|
return !((o.ptr == ptr) && (o.count == count));
|
}
|
|
|
|
};
|
|
template <typename T>
|
class Node {
|
public:
|
alignas(16) std::atomic<Pointer<T> > next;
|
T value;
|
|
Node() {
|
}
|
|
void *operator new(size_t size){
|
return mm_malloc(size);
|
}
|
|
void operator delete(void *p) {
|
return mm_free(p);
|
}
|
};
|
|
|
|
|
|
template <typename T>
|
class SQueue
|
{
|
private:
|
// class scope definitions
|
enum {Q_SIZE = 10};
|
|
int slots;
|
int items;
|
// private class members
|
std::atomic<Pointer<T> > Head; // pointer to front of Queue
|
std::atomic<Pointer<T> > Tail; // pointer to rear of Queue
|
//std::atomic_uint count; // current number of size in Queue
|
std::atomic_uint count;
|
const size_t qsize; // maximum number of size in Queue
|
// preemptive definitions to prevent public copying
|
SQueue(const SQueue & q) : qsize(0) { }
|
SQueue & operator=(const SQueue & q) { return *this;}
|
bool _enqueue(const T &item); // add item to end
|
bool _dequeue(T &item); // remove item from front
|
public:
|
SQueue(size_t qs = Q_SIZE); // create queue with a qs limit
|
~SQueue();
|
bool isempty() const;
|
bool isfull() const;
|
unsigned int size() const;
|
bool enqueue(const T &item); // add item to end
|
bool enqueue_nowait(const T &item);
|
bool enqueue_timeout(const T &item, struct timespec *timeout);
|
bool dequeue(T &item);
|
bool dequeue_nowait(T &item);
|
bool dequeue_timeout(T &item, struct timespec * timeout);
|
|
|
|
T& operator[](unsigned i);
|
};
|
|
|
|
|
// Queue methods
|
template <typename T>
|
SQueue<T>::SQueue(size_t qs) : count(0), qsize(qs)
|
{
|
Node<T> *node = new Node<T>;
|
Pointer<T> pointer(node, 0);
|
|
Head.store(pointer, std::memory_order_relaxed);
|
Tail.store(pointer, std::memory_order_relaxed);
|
|
slots = pcsem::init(145, qsize);
|
items = pcsem::init(146, 0);
|
|
}
|
|
template <typename T>
|
SQueue<T>::~SQueue()
|
{
|
std::cerr << "squeue destory" << std::endl;
|
pcsem::remove(slots);
|
pcsem::remove(items);
|
|
|
Node<T> * nodeptr;
|
Pointer<T> tmp = Head.load(std::memory_order_relaxed);
|
while((nodeptr = tmp.ptr) != NULL) {
|
tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
|
//std::cerr << "delete " << nodeptr << std::endl;
|
delete nodeptr;
|
|
}
|
|
}
|
|
template <typename T>
|
bool SQueue<T>::isempty() const
|
{
|
return count == 0;
|
}
|
|
template <typename T>
|
bool SQueue<T>::isfull() const
|
{
|
return count == qsize;
|
}
|
|
template <typename T>
|
unsigned int SQueue<T>::size() const
|
{
|
return count;
|
}
|
|
// Add item to queue
|
template <typename T>
|
bool SQueue<T>::_enqueue(const T & item)
|
{
|
if (isfull())
|
return false;
|
|
Node<T> * node = new Node<T>;
|
node->value = item;
|
|
|
Pointer<T> tail ;
|
Pointer<T> next ;
|
|
|
while(true) {
|
tail = Tail.load(std::memory_order_relaxed);
|
next = (tail.ptr->next).load(std::memory_order_relaxed);
|
if (tail == Tail.load(std::memory_order_relaxed)) {
|
if (next.ptr == NULL) {
|
if ((tail.ptr->next).compare_exchange_weak(next,
|
Pointer<T>(node, next.count+1),
|
std::memory_order_release,
|
std::memory_order_relaxed) )
|
break;
|
else
|
Tail.compare_exchange_weak(tail,
|
Pointer<T>(next.ptr, tail.count+1),
|
std::memory_order_release,
|
std::memory_order_relaxed);
|
}
|
|
}
|
}
|
|
Tail.compare_exchange_weak(tail, Pointer<T>(node, tail.count+1),
|
std::memory_order_release,
|
std::memory_order_relaxed);
|
count++;
|
return true;
|
}
|
|
template <typename T>
|
bool SQueue<T>::enqueue(const T & item)
|
{
|
if (pcsem::dec(slots) == -1) {
|
err_exit(errno, "enqueue");
|
}
|
|
if (SQueue<T>::_enqueue(item)) {
|
pcsem::inc(items);
|
return true;
|
}
|
return false;
|
|
}
|
|
template <typename T>
|
bool SQueue<T>::enqueue_nowait(const T & item)
|
{
|
if (pcsem::dec_nowait(slots) == -1) {
|
if (errno == EAGAIN)
|
return false;
|
else
|
err_exit(errno, "enqueue_nowait");
|
}
|
|
if (SQueue<T>::_enqueue(item)) {
|
pcsem::inc(items);
|
return true;
|
}
|
return false;
|
|
}
|
|
template <typename T>
|
bool SQueue<T>::enqueue_timeout(const T & item, struct timespec * timeout)
|
{
|
if (pcsem::dec_timeout(slots, timeout) == -1) {
|
if (errno == EAGAIN)
|
return false;
|
else
|
err_exit(errno, "enqueue_timeout");
|
}
|
|
if (SQueue<T>::_enqueue(item)){
|
pcsem::inc(items);
|
return true;
|
}
|
return false;
|
|
}
|
|
|
// Place front item into item variable and remove from queue
|
template <typename T>
|
bool SQueue<T>::_dequeue(T & item)
|
{
|
if (isempty())
|
return false;
|
|
Pointer<T> head;
|
Pointer<T> tail;
|
Pointer<T> next;
|
|
while(true) {
|
head = Head.load(std::memory_order_relaxed);
|
tail = Tail.load(std::memory_order_relaxed);
|
next = (head.ptr->next).load();
|
if (head == Head.load(std::memory_order_relaxed)) {
|
if(head.ptr == tail.ptr) {
|
if (next.ptr == NULL)
|
return false;
|
// Tail is falling behind. Try to advance it
|
Tail.compare_exchange_weak(tail,
|
Pointer<T>(next.ptr, tail.count+1),
|
std::memory_order_release,
|
std::memory_order_relaxed);
|
} else {
|
item = next.ptr->value;
|
if (Head.compare_exchange_weak(head,
|
Pointer<T>(next.ptr, head.count+1),
|
std::memory_order_release,
|
std::memory_order_relaxed)) {
|
delete head.ptr;
|
break;
|
}
|
|
}
|
}
|
|
}
|
|
count--;
|
return true;
|
|
}
|
|
template <typename T>
|
bool SQueue<T>::dequeue(T & item)
|
{
|
if (pcsem::dec(items) == -1) {
|
err_exit(errno, "dequeue");
|
}
|
|
if (SQueue<T>::_dequeue(item)) {
|
pcsem::inc(slots);
|
return true;
|
}
|
return false;
|
|
}
|
|
template <typename T>
|
bool SQueue<T>::dequeue_nowait(T & item)
|
{
|
if (pcsem::dec_nowait(items) == -1) {
|
if (errno == EAGAIN)
|
return false;
|
else
|
err_exit(errno, "dequeue_nowait");
|
}
|
|
if (SQueue<T>::_dequeue(item)) {
|
pcsem::inc(slots);
|
return true;
|
}
|
return false;
|
|
}
|
|
template <typename T>
|
bool SQueue<T>::dequeue_timeout(T & item, struct timespec * timeout)
|
{
|
if (pcsem::dec_timeout(items, timeout) == -1) {
|
if (errno == EAGAIN)
|
return false;
|
else
|
err_exit(errno, "dequeue_timeout");
|
}
|
|
if (SQueue<T>::_dequeue(item)) {
|
pcsem::inc(slots);
|
return true;
|
}
|
return false;
|
|
}
|
|
|
template <class T>
|
T& SQueue<T>::operator[](unsigned int i)
|
{
|
if (i < 0 || i >= count)
|
{
|
std::cerr << "Error in array limits: " << i << " is out of range\n";
|
std::exit(EXIT_FAILURE);
|
}
|
|
|
Pointer<T> tmp = Head.load(std::memory_order_relaxed);
|
//Pointer<T> tail = Tail.load(std::memory_order_relaxed);
|
|
while(i > 0) {
|
//std::cout << i << ":" << std::endl;
|
tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
|
i--;
|
}
|
|
tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
|
return tmp.ptr->value;
|
}
|
|
|
#endif
|