// queue.h -- interface for a queue #ifndef SArrayLockFreeQueue_H_ #define SArrayLockFreeQueue_H_ #include "mm.h" #include "pcsem.h" #include "SAbstractQueue.h" template class SArrayLockFreeQueue :public SAbstractQueue { private: // class scope definitions T * arr; enum {Q_SIZE = 16}; int slots; int items; // private class members std::atomic_uint m_readIndex; std::atomic_uint m_writeIndex; std::atomic_uint m_maximumReadIndex; std::atomic_uint count; const size_t qsize; // maximum number of size in Queue // preemptive definitions to prevent public copying SArrayLockFreeQueue(const SArrayLockFreeQueue & q) : qsize(0) { } SArrayLockFreeQueue & operator=(const SArrayLockFreeQueue & q) { return *this;} bool _add(const T &item); // add item to end bool _remove(T &item); // remove item from front inline unsigned int countToIndex(unsigned int count) const; public: SArrayLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit virtual ~SArrayLockFreeQueue(); inline bool isempty() const; inline bool isfull() const; virtual unsigned int size() const; virtual bool add(const T &item); // add item to end virtual bool add_nowait(const T &item); virtual bool add_timeout(const T &item, struct timespec *timeout); virtual bool remove(T &item); virtual bool remove_nowait(T &item); virtual bool remove_timeout(T &item, struct timespec * timeout); virtual T& operator[](unsigned i); }; // Queue methods template SArrayLockFreeQueue::SArrayLockFreeQueue(size_t qs) : m_readIndex(0), m_writeIndex(0), m_maximumReadIndex(0), count(0), qsize(qs) { arr = (T*)mm_malloc(qsize * sizeof(T)); slots = pcsem::init(IPC_PRIVATE, qsize); items = pcsem::init(IPC_PRIVATE, 0); } template SArrayLockFreeQueue::~SArrayLockFreeQueue() { std::cerr << "SArrayLockFreeQueue destory" << std::endl; pcsem::remove(slots); pcsem::remove(items); mm_free(arr); } template inline bool SArrayLockFreeQueue::isempty() const { return countToIndex(m_readIndex.load(std::memory_order_relaxed)) == countToIndex( m_maximumReadIndex.load(std::memory_order_relaxed)); } template inline bool SArrayLockFreeQueue::isfull() const { return countToIndex(m_maximumReadIndex.load(std::memory_order_relaxed) + 1) == countToIndex(m_readIndex.load(std::memory_order_relaxed)); //return count == qsize; } template unsigned int SArrayLockFreeQueue::size() const { return count; } // Add item to queue template bool SArrayLockFreeQueue::_add(const T & item) { unsigned int currentReadIndex; unsigned int currentWriteIndex; do { currentWriteIndex = m_writeIndex.load(std::memory_order_relaxed); currentReadIndex = m_readIndex.load(std::memory_order_relaxed); // if (countToIndex(currentWriteIndex + 1 ) == countToIndex(currentReadIndex)) if (isfull()) { // the queue is full return false; } } while (!m_writeIndex.compare_exchange_weak(currentWriteIndex, (currentWriteIndex + 1), std::memory_order_release, std::memory_order_relaxed)); // We know now that this index is reserved for us. Use it to save the data std::cerr << "add " << count.load( std::memory_order_relaxed) << std::endl; arr[countToIndex(currentWriteIndex)] = item; count++; // update the maximum read index after saving the data. It wouldn't fail if there is only one thread // inserting in the queue. It might fail if there are more than 1 producer threads because this // operation has to be done in the same order as the previous CAS while (!m_maximumReadIndex.compare_exchange_weak(currentWriteIndex, (currentWriteIndex + 1), std::memory_order_release, std::memory_order_relaxed)) { // this is a good place to yield the thread in case there are more // software threads than hardware processors and you have more // than 1 producer thread // have a look at sched_yield (POSIX.1b) sched_yield(); } return true; } template bool SArrayLockFreeQueue::add(const T & item) { if (pcsem::dec(slots) == -1) { err_exit(errno, "add"); } if (SArrayLockFreeQueue::_add(item)) { pcsem::inc(items); return true; } return false; } template bool SArrayLockFreeQueue::add_nowait(const T & item) { if (pcsem::dec_nowait(slots) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "add_nowait"); } if (SArrayLockFreeQueue::_add(item)) { pcsem::inc(items); return true; } return false; } template bool SArrayLockFreeQueue::add_timeout(const T & item, struct timespec * timeout) { if (pcsem::dec_timeout(slots, timeout) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "add_timeout"); } if (SArrayLockFreeQueue::_add(item)){ pcsem::inc(items); return true; } return false; } // Place front item into item variable and remove from queue template bool SArrayLockFreeQueue::_remove(T & item) { unsigned int currentMaximumReadIndex; unsigned int currentReadIndex; do { // to ensure thread-safety when there is more than 1 producer thread // a second index is defined (m_maximumReadIndex) currentReadIndex = m_readIndex.load(std::memory_order_relaxed); currentMaximumReadIndex = m_maximumReadIndex.load(std::memory_order_relaxed); if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) { // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it return false; } // retrieve the data from the queue item = arr[countToIndex(currentReadIndex)]; // try to perfrom now the CAS operation on the read index. If we succeed // a_data already contains what m_readIndex pointed to before we // increased it if (m_readIndex.compare_exchange_weak(currentReadIndex, (currentReadIndex + 1), std::memory_order_release, std::memory_order_relaxed)) { count--; return true; } // it failed retrieving the element off the queue. Someone else must // have read the element stored at countToIndex(currentReadIndex) // before we could perform the CAS operation } while(1); // keep looping to try again! // Something went wrong. it shouldn't be possible to reach here //assert(0); // Add this return statement to avoid compiler warnings return false; } template bool SArrayLockFreeQueue::remove(T & item) { if (pcsem::dec(items) == -1) { err_exit(errno, "remove"); } if (SArrayLockFreeQueue::_remove(item)) { pcsem::inc(slots); return true; } return false; } template bool SArrayLockFreeQueue::remove_nowait(T & item) { if (pcsem::dec_nowait(items) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "remove_nowait"); } if (SArrayLockFreeQueue::_remove(item)) { pcsem::inc(slots); return true; } return false; } template bool SArrayLockFreeQueue::remove_timeout(T & item, struct timespec * timeout) { if (pcsem::dec_timeout(items, timeout) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "remove_timeout"); } if (SArrayLockFreeQueue::_remove(item)) { pcsem::inc(slots); return true; } return false; } template inline unsigned int SArrayLockFreeQueue::countToIndex(unsigned int _count) const{ return _count % qsize; } template T& SArrayLockFreeQueue::operator[](unsigned int i) { if (i < 0 || i >= count) { std::cerr << "SArrayLockFreeQueue operator[] ,Error in array limits: " << i << " is out of range\n"; std::exit(EXIT_FAILURE); } return arr[countToIndex( m_readIndex.load(std::memory_order_relaxed) + i)]; } #endif