| | |
| | | #ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ |
| | | #define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ |
| | | #ifndef __ARRAY_LOCK_FREE_QUEUE_H__ |
| | | #define __ARRAY_LOCK_FREE_QUEUE_H__ |
| | | |
| | | #include "atomic_ops.h" |
| | | #include <assert.h> // assert() |
| | | #include <sched.h> // sched_yield() |
| | | #include "logger_factory.h" |
| | | #include "mem_pool.h" |
| | | #include "shm_mm.h" |
| | | #include "shm_allocator.h" |
| | | |
| | | /// @brief implementation of an array based lock free queue with support for |
| | |
| | | |
| | | #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | template <typename ELEM_T, typename Allocator = SHM_Allocator> |
| | | class ArrayLockFreeQueue |
| | | { |
| | | // ArrayLockFreeQueue will be using this' private members |
| | | template < |
| | | typename ELEM_T_, |
| | | typename Allocator_, |
| | | template <typename T, typename AT> class Q_TYPE |
| | | > |
| | | friend class LockFreeQueue; |
| | | template<typename ELEM_T, typename Allocator = SHM_Allocator> |
| | | class ArrayLockFreeQueue { |
| | | // ArrayLockFreeQueue will be using this' private members |
| | | template< |
| | | typename ELEM_T_, |
| | | typename Allocator_, |
| | | template<typename T, typename AT> class Q_TYPE |
| | | > |
| | | friend |
| | | class LockFreeQueue; |
| | | |
| | | private: |
| | | /// @brief constructor of the class |
| | | ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | | virtual ~ArrayLockFreeQueue(); |
| | | |
| | | inline uint32_t size(); |
| | | |
| | | inline bool full(); |
| | | /// @brief constructor of the class |
| | | ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | | inline bool empty(); |
| | | |
| | | bool push(const ELEM_T &a_data); |
| | | |
| | | bool pop(ELEM_T &a_data); |
| | | |
| | | /// @brief calculate the index in the circular array that corresponds |
| | | /// to a particular "count" value |
| | | inline uint32_t countToIndex(uint32_t a_count); |
| | | virtual ~ArrayLockFreeQueue(); |
| | | |
| | | ELEM_T& operator[](unsigned i); |
| | | |
| | | private: |
| | | size_t Q_SIZE; |
| | | /// @brief array to keep the elements |
| | | ELEM_T *m_theQueue; |
| | | inline uint32_t size(); |
| | | |
| | | /// @brief where a new element will be inserted |
| | | uint32_t m_writeIndex; |
| | | inline bool full(); |
| | | |
| | | /// @brief where the next element where be extracted from |
| | | uint32_t m_readIndex; |
| | | |
| | | /// @brief maximum read index for multiple producer queues |
| | | /// If it's not the same as m_writeIndex it means |
| | | /// there are writes pending to be "committed" to the queue, that means, |
| | | /// the place for the data was reserved (the index in the array) but |
| | | /// data is still not in the queue, so the thread trying to read will have |
| | | /// to wait for those other threads to save the data into the queue |
| | | /// |
| | | /// note this is only used for multiple producers |
| | | uint32_t m_maximumReadIndex; |
| | | inline bool empty(); |
| | | |
| | | bool push(const ELEM_T &a_data); |
| | | |
| | | bool pop(ELEM_T &a_data); |
| | | |
| | | /// @brief calculate the index in the circular array that corresponds |
| | | /// to a particular "count" value |
| | | inline uint32_t countToIndex(uint32_t a_count); |
| | | |
| | | ELEM_T &operator[](unsigned i); |
| | | |
| | | private: |
| | | size_t Q_SIZE; |
| | | /// @brief array to keep the elements |
| | | ELEM_T *m_theQueue; |
| | | |
| | | /// @brief where a new element will be inserted |
| | | uint32_t m_writeIndex; |
| | | |
| | | /// @brief where the next element where be extracted from |
| | | uint32_t m_readIndex; |
| | | |
| | | /// @brief maximum read index for multiple producer queues |
| | | /// If it's not the same as m_writeIndex it means |
| | | /// there are writes pending to be "committed" to the queue, that means, |
| | | /// the place for the data was reserved (the index in the array) but |
| | | /// data is still not in the queue, so the thread trying to read will have |
| | | /// to wait for those other threads to save the data into the queue |
| | | /// |
| | | /// note this is only used for multiple producers |
| | | uint32_t m_maximumReadIndex; |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | /// @brief number of elements in the queue |
| | | uint32_t m_count; |
| | | /// @brief number of elements in the queue |
| | | uint32_t m_count; |
| | | #endif |
| | | |
| | | |
| | | |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); |
| | | /// @brief disable copy constructor declaring it private |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); |
| | | |
| | | }; |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | template<typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize): |
| | | Q_SIZE(qsize), |
| | | m_writeIndex(0), // initialisation is not atomic |
| | | m_readIndex(0), // |
| | | m_maximumReadIndex(0) // |
| | | Q_SIZE(qsize), |
| | | m_writeIndex(0), // initialisation is not atomic |
| | | m_readIndex(0), // |
| | | m_maximumReadIndex(0) // |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | ,m_count(0) // |
| | | , m_count(0) // |
| | | #endif |
| | | { |
| | | m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); |
| | | m_theQueue = (ELEM_T *) Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() |
| | | { |
| | | // std::cout << "destroy ArrayLockFreeQueue\n"; |
| | | Allocator::deallocate(m_theQueue); |
| | | |
| | | template<typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() { |
| | | // std::cout << "destroy ArrayLockFreeQueue\n"; |
| | | Allocator::deallocate(m_theQueue); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) |
| | | { |
| | | // if Q_SIZE is a power of 2 this statement could be also written as |
| | | // return (a_count & (Q_SIZE - 1)); |
| | | return (a_count % Q_SIZE); |
| | | template<typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) { |
| | | // if Q_SIZE is a power of 2 this statement could be also written as |
| | | // return (a_count & (Q_SIZE - 1)); |
| | | return (a_count % Q_SIZE); |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() |
| | | { |
| | | template<typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return m_count; |
| | | return m_count; |
| | | #else |
| | | |
| | | uint32_t currentWriteIndex = m_maximumReadIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | uint32_t currentWriteIndex = m_maximumReadIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | |
| | | // let's think of a scenario where this function returns bogus data |
| | | // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run |
| | | // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 |
| | | // 2. afterwards this thread is preemted. While this thread is inactive 2 |
| | | // elements are inserted and removed from the queue, so m_maximumReadIndex |
| | | // is 5 and m_readIndex 4. Real size is still 1 |
| | | // 3. Now the current thread comes back from preemption and reads m_readIndex. |
| | | // currentReadIndex is 4 |
| | | // 4. currentReadIndex is bigger than currentWriteIndex, so |
| | | // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, |
| | | // it returns that the queue is almost full, when it is almost empty |
| | | // |
| | | if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) |
| | | { |
| | | return (currentWriteIndex - currentReadIndex); |
| | | } |
| | | else |
| | | { |
| | | return (Q_SIZE + currentWriteIndex - currentReadIndex); |
| | | } |
| | | // let's think of a scenario where this function returns bogus data |
| | | // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run |
| | | // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 |
| | | // 2. afterwards this thread is preemted. While this thread is inactive 2 |
| | | // elements are inserted and removed from the queue, so m_maximumReadIndex |
| | | // is 5 and m_readIndex 4. Real size is still 1 |
| | | // 3. Now the current thread comes back from preemption and reads m_readIndex. |
| | | // currentReadIndex is 4 |
| | | // 4. currentReadIndex is bigger than currentWriteIndex, so |
| | | // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, |
| | | // it returns that the queue is almost full, when it is almost empty |
| | | // |
| | | if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) |
| | | { |
| | | return (currentWriteIndex - currentReadIndex); |
| | | } |
| | | else |
| | | { |
| | | return (Q_SIZE + currentWriteIndex - currentReadIndex); |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() |
| | | { |
| | | template<typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count == (Q_SIZE)); |
| | | return (m_count == (Q_SIZE)); |
| | | #else |
| | | |
| | | uint32_t currentWriteIndex = m_writeIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | |
| | | uint32_t currentWriteIndex = m_writeIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | |
| | | if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template<typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count == 0); |
| | | #else |
| | | |
| | | if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | |
| | | template<typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) { |
| | | uint32_t currentReadIndex; |
| | | uint32_t currentWriteIndex; |
| | | |
| | | do { |
| | | |
| | | currentWriteIndex = m_writeIndex; |
| | | currentReadIndex = m_readIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count == Q_SIZE) { |
| | | return false; |
| | | } |
| | | #else |
| | | if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count == 0); |
| | | #else |
| | | |
| | | if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) |
| | | { |
| | | uint32_t currentReadIndex; |
| | | uint32_t currentWriteIndex; |
| | | |
| | | do |
| | | { |
| | | |
| | | currentWriteIndex = m_writeIndex; |
| | | currentReadIndex = m_readIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count == Q_SIZE) { |
| | | return false; |
| | | } |
| | | #else |
| | | if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); |
| | | |
| | | // We know now that this index is reserved for us. Use it to save the data |
| | | m_theQueue[countToIndex(currentWriteIndex)] = a_data; |
| | | |
| | | // update the maximum read index after saving the data. It wouldn't fail if there is only one thread |
| | | // inserting in the queue. It might fail if there are more than 1 producer threads because this |
| | | // operation has to be done in the same order as the previous CAS |
| | | |
| | | while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) |
| | | { |
| | | // this is a good place to yield the thread in case there are more |
| | | // software threads than hardware processors and you have more |
| | | // than 1 producer thread |
| | | // have a look at sched_yield (POSIX.1b) |
| | | sched_yield(); |
| | | } |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | AtomicAdd(&m_count, 1); |
| | | #endif |
| | | return true; |
| | | |
| | | } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); |
| | | |
| | | // We know now that this index is reserved for us. Use it to save the data |
| | | m_theQueue[countToIndex(currentWriteIndex)] = a_data; |
| | | |
| | | // update the maximum read index after saving the data. It wouldn't fail if there is only one thread |
| | | // inserting in the queue. It might fail if there are more than 1 producer threads because this |
| | | // operation has to be done in the same order as the previous CAS |
| | | |
| | | while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { |
| | | // this is a good place to yield the thread in case there are more |
| | | // software threads than hardware processors and you have more |
| | | // than 1 producer thread |
| | | // have a look at sched_yield (POSIX.1b) |
| | | sched_yield(); |
| | | } |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | AtomicAdd(&m_count, 1); |
| | | #endif |
| | | return true; |
| | | } |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) |
| | | { |
| | | uint32_t currentMaximumReadIndex; |
| | | uint32_t currentReadIndex; |
| | | template<typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) { |
| | | uint32_t currentMaximumReadIndex; |
| | | uint32_t currentReadIndex; |
| | | |
| | | do |
| | | { |
| | | // to ensure thread-safety when there is more than 1 producer thread |
| | | // a second index is defined (m_maximumReadIndex) |
| | | currentReadIndex = m_readIndex; |
| | | currentMaximumReadIndex = m_maximumReadIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | do { |
| | | // to ensure thread-safety when there is more than 1 producer thread |
| | | // a second index is defined (m_maximumReadIndex) |
| | | currentReadIndex = m_readIndex; |
| | | currentMaximumReadIndex = m_maximumReadIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count == 0) { |
| | | return false; |
| | | } |
| | | #else |
| | | if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) |
| | | { |
| | | // the queue is empty or |
| | | // a producer thread has allocate space in the queue but is |
| | | // waiting to commit the data into it |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | // retrieve the data from the queue |
| | | a_data = m_theQueue[countToIndex(currentReadIndex)]; |
| | | |
| | | // try to perfrom now the CAS operation on the read index. If we succeed |
| | | // a_data already contains what m_readIndex pointed to before we |
| | | // increased it |
| | | if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | // m_count.fetch_sub(1); |
| | | AtomicSub(&m_count, 1); |
| | | #endif |
| | | return true; |
| | | } |
| | | |
| | | // it failed retrieving the element off the queue. Someone else must |
| | | // have read the element stored at countToIndex(currentReadIndex) |
| | | // before we could perform the CAS operation |
| | | |
| | | } while(1); // keep looping to try again! |
| | | |
| | | // Something went wrong. it shouldn't be possible to reach here |
| | | assert(0); |
| | | |
| | | // Add this return statement to avoid compiler warnings |
| | | return false; |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) |
| | | { |
| | | int currentCount = m_count; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | if (i >= currentCount) |
| | | { |
| | | std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; |
| | | std::exit(EXIT_FAILURE); |
| | | if (m_count == 0) { |
| | | return false; |
| | | } |
| | | return m_theQueue[countToIndex(currentReadIndex+i)]; |
| | | #else |
| | | if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) |
| | | { |
| | | // the queue is empty or |
| | | // a producer thread has allocate space in the queue but is |
| | | // waiting to commit the data into it |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | // retrieve the data from the queue |
| | | a_data = m_theQueue[countToIndex(currentReadIndex)]; |
| | | |
| | | // try to perfrom now the CAS operation on the read index. If we succeed |
| | | // a_data already contains what m_readIndex pointed to before we |
| | | // increased it |
| | | if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | // m_count.fetch_sub(1); |
| | | AtomicSub(&m_count, 1); |
| | | #endif |
| | | return true; |
| | | } |
| | | |
| | | // it failed retrieving the element off the queue. Someone else must |
| | | // have read the element stored at countToIndex(currentReadIndex) |
| | | // before we could perform the CAS operation |
| | | |
| | | } while (1); // keep looping to try again! |
| | | |
| | | // Something went wrong. it shouldn't be possible to reach here |
| | | assert(0); |
| | | |
| | | // Add this return statement to avoid compiler warnings |
| | | return false; |
| | | } |
| | | |
| | | template<typename ELEM_T, typename Allocator> |
| | | ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) { |
| | | int currentCount = m_count; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | if (i >= currentCount) { |
| | | std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i |
| | | << " is out of range\n"; |
| | | std::exit(EXIT_FAILURE); |
| | | } |
| | | return m_theQueue[countToIndex(currentReadIndex + i)]; |
| | | } |
| | | |
| | | #endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ |