// queue.h -- interface for a queue #ifndef SQUEUE_H_ #define SQUEUE_H_ #include "mm.h" #include "pcsem.h" template class Node; template class Pointer { public: Node *ptr; unsigned long count; Pointer( Node *node = NULL, int c=0) noexcept : ptr(node), count(c) {} bool operator == (const Pointer o) const { return (o.ptr == ptr) && (o.count == count); } bool operator != (const Pointer o) const { return !((o.ptr == ptr) && (o.count == count)); } }; template class Node { public: alignas(16) std::atomic > next; T value; Node() { } void *operator new(size_t size){ return mm_malloc(size); } void operator delete(void *p) { return mm_free(p); } }; template class SQueue { private: // class scope definitions enum {Q_SIZE = 10}; int slots; int items; // private class members std::atomic > Head; // pointer to front of Queue std::atomic > Tail; // pointer to rear of Queue //std::atomic_uint count; // current number of size in Queue std::atomic_uint count; const size_t qsize; // maximum number of size in Queue // preemptive definitions to prevent public copying SQueue(const SQueue & q) : qsize(0) { } SQueue & operator=(const SQueue & q) { return *this;} bool _enqueue(const T &item); // add item to end bool _dequeue(T &item); // remove item from front public: SQueue(size_t qs = Q_SIZE); // create queue with a qs limit ~SQueue(); bool isempty() const; bool isfull() const; unsigned int size() const; bool enqueue(const T &item); // add item to end bool enqueue_nowait(const T &item); bool enqueue_timeout(const T &item, struct timespec *timeout); bool dequeue(T &item); bool dequeue_nowait(T &item); bool dequeue_timeout(T &item, struct timespec * timeout); T& operator[](unsigned i); }; // Queue methods template SQueue::SQueue(size_t qs) : count(0), qsize(qs) { Node *node = new Node; Pointer pointer(node, 0); Head.store(pointer, std::memory_order_relaxed); Tail.store(pointer, std::memory_order_relaxed); slots = pcsem::init(145, qsize); items = pcsem::init(146, 0); } template SQueue::~SQueue() { std::cerr << "squeue destory" << std::endl; pcsem::remove(slots); pcsem::remove(items); Node * nodeptr; Pointer tmp = Head.load(std::memory_order_relaxed); while((nodeptr = tmp.ptr) != NULL) { tmp = (tmp.ptr->next).load(std::memory_order_relaxed); //std::cerr << "delete " << nodeptr << std::endl; delete nodeptr; } } template bool SQueue::isempty() const { return count == 0; } template bool SQueue::isfull() const { return count == qsize; } template unsigned int SQueue::size() const { return count; } // Add item to queue template bool SQueue::_enqueue(const T & item) { if (isfull()) return false; Node * node = new Node; node->value = item; Pointer tail ; Pointer next ; while(true) { tail = Tail.load(std::memory_order_relaxed); next = (tail.ptr->next).load(std::memory_order_relaxed); if (tail == Tail.load(std::memory_order_relaxed)) { if (next.ptr == NULL) { if ((tail.ptr->next).compare_exchange_weak(next, Pointer(node, next.count+1), std::memory_order_release, std::memory_order_relaxed) ) break; else Tail.compare_exchange_weak(tail, Pointer(next.ptr, tail.count+1), std::memory_order_release, std::memory_order_relaxed); } } } Tail.compare_exchange_weak(tail, Pointer(node, tail.count+1), std::memory_order_release, std::memory_order_relaxed); count++; return true; } template bool SQueue::enqueue(const T & item) { if (pcsem::dec(slots) == -1) { err_exit(errno, "enqueue"); } if (SQueue::_enqueue(item)) { pcsem::inc(items); return true; } return false; } template bool SQueue::enqueue_nowait(const T & item) { if (pcsem::dec_nowait(slots) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "enqueue_nowait"); } if (SQueue::_enqueue(item)) { pcsem::inc(items); return true; } return false; } template bool SQueue::enqueue_timeout(const T & item, struct timespec * timeout) { if (pcsem::dec_timeout(slots, timeout) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "enqueue_timeout"); } if (SQueue::_enqueue(item)){ pcsem::inc(items); return true; } return false; } // Place front item into item variable and remove from queue template bool SQueue::_dequeue(T & item) { if (isempty()) return false; Pointer head; Pointer tail; Pointer next; while(true) { head = Head.load(std::memory_order_relaxed); tail = Tail.load(std::memory_order_relaxed); next = (head.ptr->next).load(); if (head == Head.load(std::memory_order_relaxed)) { if(head.ptr == tail.ptr) { if (next.ptr == NULL) return false; // Tail is falling behind. Try to advance it Tail.compare_exchange_weak(tail, Pointer(next.ptr, tail.count+1), std::memory_order_release, std::memory_order_relaxed); } else { item = next.ptr->value; if (Head.compare_exchange_weak(head, Pointer(next.ptr, head.count+1), std::memory_order_release, std::memory_order_relaxed)) { delete head.ptr; break; } } } } count--; return true; } template bool SQueue::dequeue(T & item) { if (pcsem::dec(items) == -1) { err_exit(errno, "dequeue"); } if (SQueue::_dequeue(item)) { pcsem::inc(slots); return true; } return false; } template bool SQueue::dequeue_nowait(T & item) { if (pcsem::dec_nowait(items) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "dequeue_nowait"); } if (SQueue::_dequeue(item)) { pcsem::inc(slots); return true; } return false; } template bool SQueue::dequeue_timeout(T & item, struct timespec * timeout) { if (pcsem::dec_timeout(items, timeout) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "dequeue_timeout"); } if (SQueue::_dequeue(item)) { pcsem::inc(slots); return true; } return false; } template T& SQueue::operator[](unsigned int i) { if (i < 0 || i >= count) { std::cerr << "Error in array limits: " << i << " is out of range\n"; std::exit(EXIT_FAILURE); } Pointer tmp = Head.load(std::memory_order_relaxed); //Pointer tail = Tail.load(std::memory_order_relaxed); while(i > 0) { //std::cout << i << ":" << std::endl; tmp = (tmp.ptr->next).load(std::memory_order_relaxed); i--; } tmp = (tmp.ptr->next).load(std::memory_order_relaxed); return tmp.ptr->value; } #endif