| | |
| | | #ifndef __ARRAY_LOCK_FREE_QUEUE_H__ |
| | | #define __ARRAY_LOCK_FREE_QUEUE_H__ |
| | | |
| | | #include "atomic_ops.h" |
| | | #include <assert.h> // assert() |
| | | #include <sched.h> // sched_yield() |
| | |
| | | #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | template <typename ELEM_T, typename Allocator = SHM_Allocator> |
| | | class ArrayLockFreeQueue |
| | | { |
| | | class ArrayLockFreeQueue { |
| | | // ArrayLockFreeQueue will be using this' private members |
| | | template < |
| | | typename ELEM_T_, |
| | | typename Allocator_, |
| | | template <typename T, typename AT> class Q_TYPE |
| | | > |
| | | friend class LockFreeQueue; |
| | | friend |
| | | class LockFreeQueue; |
| | | |
| | | private: |
| | | /// @brief constructor of the class |
| | |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() |
| | | { |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() { |
| | | // std::cout << "destroy ArrayLockFreeQueue\n"; |
| | | Allocator::deallocate(m_theQueue); |
| | | |
| | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) |
| | | { |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) { |
| | | // if Q_SIZE is a power of 2 this statement could be also written as |
| | | // return (a_count & (Q_SIZE - 1)); |
| | | return (a_count % Q_SIZE); |
| | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() |
| | | { |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return m_count; |
| | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() |
| | | { |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count == (Q_SIZE)); |
| | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() |
| | | { |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count == 0); |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) |
| | | { |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) { |
| | | uint32_t currentReadIndex; |
| | | uint32_t currentWriteIndex; |
| | | |
| | | do |
| | | { |
| | | do { |
| | | |
| | | currentWriteIndex = m_writeIndex; |
| | | currentReadIndex = m_readIndex; |
| | |
| | | // inserting in the queue. It might fail if there are more than 1 producer threads because this |
| | | // operation has to be done in the same order as the previous CAS |
| | | |
| | | while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) |
| | | { |
| | | while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { |
| | | // this is a good place to yield the thread in case there are more |
| | | // software threads than hardware processors and you have more |
| | | // than 1 producer thread |
| | |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) |
| | | { |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) { |
| | | uint32_t currentMaximumReadIndex; |
| | | uint32_t currentReadIndex; |
| | | |
| | | do |
| | | { |
| | | do { |
| | | // to ensure thread-safety when there is more than 1 producer thread |
| | | // a second index is defined (m_maximumReadIndex) |
| | | currentReadIndex = m_readIndex; |
| | |
| | | // try to perfrom now the CAS operation on the read index. If we succeed |
| | | // a_data already contains what m_readIndex pointed to before we |
| | | // increased it |
| | | if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) |
| | | { |
| | | if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | // m_count.fetch_sub(1); |
| | | AtomicSub(&m_count, 1); |
| | |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) |
| | | { |
| | | ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) { |
| | | int currentCount = m_count; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | if (i >= currentCount) |
| | | { |
| | | std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; |
| | | if (i >= currentCount) { |
| | | std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i |
| | | << " is out of range\n"; |
| | | std::exit(EXIT_FAILURE); |
| | | } |
| | | return m_theQueue[countToIndex(currentReadIndex+i)]; |