| | |
| | | #ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ |
| | | #define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ |
| | | |
| | | #include "atomic_ops.h" |
| | | #include <assert.h> // assert() |
| | | #include <sched.h> // sched_yield() |
| | | #include "logger_factory.h" |
| | | #include "mem_pool.h" |
| | | #include "shm_allocator.h" |
| | | |
| | | /// @brief implementation of an array based lock free queue with support for |
| | | /// multiple producers |
| | |
| | | |
| | | #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | template <typename ELEM_T> |
| | | template <typename ELEM_T, typename Allocator = SHM_Allocator> |
| | | class ArrayLockFreeQueue |
| | | { |
| | | // ArrayLockFreeQueue will be using this' private members |
| | | template < |
| | | typename ELEM_T_, |
| | | template <typename T> class Q_TYPE > |
| | | typename Allocator_, |
| | | template <typename T, typename AT> class Q_TYPE |
| | | > |
| | | friend class LockFreeQueue; |
| | | |
| | | private: |
| | |
| | | ELEM_T *m_theQueue; |
| | | |
| | | /// @brief where a new element will be inserted |
| | | std::atomic<uint32_t> m_writeIndex; |
| | | uint32_t m_writeIndex; |
| | | |
| | | /// @brief where the next element where be extracted from |
| | | std::atomic<uint32_t> m_readIndex; |
| | | uint32_t m_readIndex; |
| | | |
| | | /// @brief maximum read index for multiple producer queues |
| | | /// If it's not the same as m_writeIndex it means |
| | |
| | | /// to wait for those other threads to save the data into the queue |
| | | /// |
| | | /// note this is only used for multiple producers |
| | | std::atomic<uint32_t> m_maximumReadIndex; |
| | | uint32_t m_maximumReadIndex; |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | /// @brief number of elements in the queue |
| | | std::atomic<uint32_t> m_count; |
| | | uint32_t m_count; |
| | | #endif |
| | | |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | | ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src); |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); |
| | | |
| | | }; |
| | | |
| | | |
| | | template <typename ELEM_T> |
| | | ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize): |
| | | template <typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize): |
| | | Q_SIZE(qsize), |
| | | m_writeIndex(0), // initialisation is not atomic |
| | | m_readIndex(0), // |
| | |
| | | ,m_count(0) // |
| | | #endif |
| | | { |
| | | m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T)); |
| | | m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue() |
| | | template <typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() |
| | | { |
| | | // std::cout << "destroy ArrayLockFreeQueue\n"; |
| | | mm_free(m_theQueue); |
| | | Allocator::deallocate(m_theQueue); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count) |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) |
| | | { |
| | | // if Q_SIZE is a power of 2 this statement could be also written as |
| | | // return (a_count & (Q_SIZE - 1)); |
| | | return (a_count % Q_SIZE); |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T>::size() |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return m_count.load(); |
| | | return m_count; |
| | | #else |
| | | |
| | | uint32_t currentWriteIndex = m_maximumReadIndex.load(); |
| | | uint32_t currentReadIndex = m_readIndex.load(); |
| | | uint32_t currentWriteIndex = m_maximumReadIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | |
| | | // let's think of a scenario where this function returns bogus data |
| | | // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run |
| | |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T>::full() |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count.load() == (Q_SIZE)); |
| | | return (m_count == (Q_SIZE)); |
| | | #else |
| | | |
| | | uint32_t currentWriteIndex = m_writeIndex; |
| | |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T>::empty() |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count.load() == 0); |
| | | return (m_count == 0); |
| | | #else |
| | | |
| | | if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load())) |
| | | if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | |
| | | } |
| | | |
| | | |
| | | template <typename ELEM_T> |
| | | bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data) |
| | | |
| | | |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) |
| | | { |
| | | uint32_t currentReadIndex; |
| | | uint32_t currentWriteIndex; |
| | | |
| | | |
| | | do |
| | | { |
| | | currentWriteIndex = m_writeIndex.load(); |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count.load() == Q_SIZE) { |
| | | currentWriteIndex = m_writeIndex; |
| | | currentReadIndex = m_readIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count == Q_SIZE) { |
| | | return false; |
| | | } |
| | | #else |
| | | if (countToIndex(currentWriteIndex + 1) == countToIndex(m_readIndex.load())) |
| | | #else |
| | | if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | // There is more than one producer. Keep looping till this thread is able |
| | | // to allocate space for current piece of data |
| | | // |
| | | // using compare_exchange_strong because it isn't allowed to fail spuriously |
| | | // When the compare_exchange operation is in a loop the weak version |
| | | // will yield better performance on some platforms, but here we'd have to |
| | | // load m_writeIndex all over again |
| | | } while (!m_writeIndex.compare_exchange_strong( |
| | | currentWriteIndex, (currentWriteIndex + 1))); |
| | | |
| | | // Just made sure this index is reserved for this thread. |
| | | #endif |
| | | |
| | | } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); |
| | | |
| | | // We know now that this index is reserved for us. Use it to save the data |
| | | m_theQueue[countToIndex(currentWriteIndex)] = a_data; |
| | | //memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) ); |
| | | |
| | | // update the maximum read index after saving the piece of data. It can't |
| | | // fail if there is only one thread inserting in the queue. It might fail |
| | | // if there is more than 1 producer thread because this operation has to |
| | | // be done in the same order as the previous CAS |
| | | // |
| | | // using compare_exchange_weak because they are allowed to fail spuriously |
| | | // (act as if *this != expected, even if they are equal), but when the |
| | | // compare_exchange operation is in a loop the weak version will yield |
| | | // better performance on some platforms. |
| | | while (!m_maximumReadIndex.compare_exchange_weak( |
| | | currentWriteIndex, (currentWriteIndex + 1))) |
| | | |
| | | // update the maximum read index after saving the data. It wouldn't fail if there is only one thread |
| | | // inserting in the queue. It might fail if there are more than 1 producer threads because this |
| | | // operation has to be done in the same order as the previous CAS |
| | | |
| | | while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) |
| | | { |
| | | // this is a good place to yield the thread in case there are more |
| | | // software threads than hardware processors and you have more |
| | |
| | | sched_yield(); |
| | | } |
| | | |
| | | // The value was successfully inserted into the queue |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | m_count.fetch_add(1); |
| | | AtomicAdd(&m_count, 1); |
| | | #endif |
| | | |
| | | return true; |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data) |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) |
| | | { |
| | | uint32_t currentMaximumReadIndex; |
| | | uint32_t currentReadIndex; |
| | | |
| | | do |
| | | { |
| | | currentReadIndex = m_readIndex.load(); |
| | | // to ensure thread-safety when there is more than 1 producer thread |
| | | // a second index is defined (m_maximumReadIndex) |
| | | currentReadIndex = m_readIndex; |
| | | currentMaximumReadIndex = m_maximumReadIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count.load() == 0) { |
| | | if (m_count == 0) { |
| | | return false; |
| | | } |
| | | #else |
| | | // to ensure thread-safety when there is more than 1 producer |
| | | // thread a second index is defined (m_maximumReadIndex) |
| | | if (countToIndex(currentReadIndex) == countToIndex(m_maximumReadIndex.load())) |
| | | if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) |
| | | { |
| | | // the queue is empty or |
| | | // a producer thread has allocate space in the queue but is |
| | | // a producer thread has allocate space in the queue but is |
| | | // waiting to commit the data into it |
| | | return false; |
| | | } |
| | |
| | | |
| | | // retrieve the data from the queue |
| | | a_data = m_theQueue[countToIndex(currentReadIndex)]; |
| | | //memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) ); |
| | | |
| | | // try to perfrom now the CAS operation on the read index. If we succeed |
| | | // a_data already contains what m_readIndex pointed to before we |
| | | // a_data already contains what m_readIndex pointed to before we |
| | | // increased it |
| | | if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1))) |
| | | if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) |
| | | { |
| | | // got here. The value was retrieved from the queue. Note that the |
| | | // data inside the m_queue array is not deleted nor reseted |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | m_count.fetch_sub(1); |
| | | #endif |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | // m_count.fetch_sub(1); |
| | | AtomicSub(&m_count, 1); |
| | | #endif |
| | | return true; |
| | | } |
| | | |
| | | |
| | | // it failed retrieving the element off the queue. Someone else must |
| | | // have read the element stored at countToIndex(currentReadIndex) |
| | | // before we could perform the CAS operation |
| | | // before we could perform the CAS operation |
| | | |
| | | } while(1); // keep looping to try again! |
| | | |
| | |
| | | assert(0); |
| | | |
| | | // Add this return statement to avoid compiler warnings |
| | | return false; |
| | | return false; |
| | | } |
| | | |
| | | |
| | | template <typename ELEM_T> |
| | | ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i) |
| | | template <typename ELEM_T, typename Allocator> |
| | | ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) |
| | | { |
| | | int currentCount = m_count.load(); |
| | | uint32_t currentReadIndex = m_readIndex.load(); |
| | | int currentCount = m_count; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | if (i < 0 || i >= currentCount) |
| | | { |
| | | std::cerr << "Error in array limits: " << i << " is out of range\n"; |
| | | std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; |
| | | std::exit(EXIT_FAILURE); |
| | | } |
| | | return m_theQueue[countToIndex(currentReadIndex+i)]; |