#ifndef __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ #define __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ #include // assert() #include // sched_yield() /// @brief implementation of an array based lock free queue with support for /// multiple producers /// This class is prevented from being instantiated directly (all members and /// methods are private). To instantiate a multiple producers lock free queue /// you must use the ArrayLockFreeQueue fachade: /// ArrayLockFreeQueue q; template class ArrayLockFreeQueue { // ArrayLockFreeQueue will be using this' private members template < typename ELEM_T_, template class Q_TYPE > friend class LockFreeQueue; private: /// @brief constructor of the class ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); virtual ~ArrayLockFreeQueue(); inline uint32_t size(); inline bool full(); inline bool empty(); bool push(const ELEM_T &a_data); bool pop(ELEM_T &a_data); /// @brief calculate the index in the circular array that corresponds /// to a particular "count" value inline uint32_t countToIndex(uint32_t a_count); private: size_t Q_SIZE; /// @brief array to keep the elements ELEM_T *m_theQueue; /// @brief where a new element will be inserted std::atomic m_writeIndex; /// @brief where the next element where be extracted from std::atomic m_readIndex; /// @brief maximum read index for multiple producer queues /// If it's not the same as m_writeIndex it means /// there are writes pending to be "committed" to the queue, that means, /// the place for the data was reserved (the index in the array) but /// data is still not in the queue, so the thread trying to read will have /// to wait for those other threads to save the data into the queue /// /// note this is only used for multiple producers std::atomic m_maximumReadIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE /// @brief number of elements in the queue std::atomic m_count; #endif static int m_reference; private: /// @brief disable copy constructor declaring it private ArrayLockFreeQueue(const ArrayLockFreeQueue &a_src); }; template int ArrayLockFreeQueue::m_reference = 0; template ArrayLockFreeQueue::ArrayLockFreeQueue(size_t qsize): Q_SIZE(qsize), m_writeIndex(0), // initialisation is not atomic m_readIndex(0), // m_maximumReadIndex(0) // #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE ,m_count(0) // #endif { m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T)); m_reference++; } template ArrayLockFreeQueue::~ArrayLockFreeQueue() { std::cout << "destroy ArrayLockFreeQueue\n"; m_reference--; if(m_reference == 0) { mm_free(m_theQueue); } } template inline uint32_t ArrayLockFreeQueue::countToIndex(uint32_t a_count) { // if Q_SIZE is a power of 2 this statement could be also written as // return (a_count & (Q_SIZE - 1)); return (a_count % Q_SIZE); } template inline uint32_t ArrayLockFreeQueue::size() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE return m_count.load(); #else uint32_t currentWriteIndex = m_maximumReadIndex.load(); uint32_t currentReadIndex = m_readIndex.load(); // let's think of a scenario where this function returns bogus data // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 // 2. afterwards this thread is preemted. While this thread is inactive 2 // elements are inserted and removed from the queue, so m_maximumReadIndex // is 5 and m_readIndex 4. Real size is still 1 // 3. Now the current thread comes back from preemption and reads m_readIndex. // currentReadIndex is 4 // 4. currentReadIndex is bigger than currentWriteIndex, so // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, // it returns that the queue is almost full, when it is almost empty // if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) { return (currentWriteIndex - currentReadIndex); } else { return (Q_SIZE + currentWriteIndex - currentReadIndex); } #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE } template inline bool ArrayLockFreeQueue::full() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE return (m_count.load() == (Q_SIZE)); #else uint32_t currentWriteIndex = m_writeIndex; uint32_t currentReadIndex = m_readIndex; if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) { // the queue is full return true; } else { // not full! return false; } #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE } template inline bool ArrayLockFreeQueue::empty() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE return (m_count.load() == 0); #else if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load())) { // the queue is full return true; } else { // not full! return false; } #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE } template bool ArrayLockFreeQueue::push(const ELEM_T &a_data) { uint32_t currentWriteIndex; do { currentWriteIndex = m_writeIndex.load(); #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count.load() == Q_SIZE) { return false; } #else if (countToIndex(currentWriteIndex + 1) == countToIndex(m_readIndex.load())) { // the queue is full return false; } #endif // There is more than one producer. Keep looping till this thread is able // to allocate space for current piece of data // // using compare_exchange_strong because it isn't allowed to fail spuriously // When the compare_exchange operation is in a loop the weak version // will yield better performance on some platforms, but here we'd have to // load m_writeIndex all over again } while (!m_writeIndex.compare_exchange_strong( currentWriteIndex, (currentWriteIndex + 1))); // Just made sure this index is reserved for this thread. m_theQueue[countToIndex(currentWriteIndex)] = a_data; // update the maximum read index after saving the piece of data. It can't // fail if there is only one thread inserting in the queue. It might fail // if there is more than 1 producer thread because this operation has to // be done in the same order as the previous CAS // // using compare_exchange_weak because they are allowed to fail spuriously // (act as if *this != expected, even if they are equal), but when the // compare_exchange operation is in a loop the weak version will yield // better performance on some platforms. while (!m_maximumReadIndex.compare_exchange_weak( currentWriteIndex, (currentWriteIndex + 1))) { // this is a good place to yield the thread in case there are more // software threads than hardware processors and you have more // than 1 producer thread // have a look at sched_yield (POSIX.1b) //sched_yield(); } // The value was successfully inserted into the queue #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE m_count.fetch_add(1); #endif return true; } template bool ArrayLockFreeQueue::pop(ELEM_T &a_data) { uint32_t currentReadIndex; do { currentReadIndex = m_readIndex.load(); #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count.load() == 0) { return false; } #else // to ensure thread-safety when there is more than 1 producer // thread a second index is defined (m_maximumReadIndex) if (countToIndex(currentReadIndex) == countToIndex(m_maximumReadIndex.load())) { // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it return false; } #endif // retrieve the data from the queue a_data = m_theQueue[countToIndex(currentReadIndex)]; // try to perfrom now the CAS operation on the read index. If we succeed // a_data already contains what m_readIndex pointed to before we // increased it if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1))) { // got here. The value was retrieved from the queue. Note that the // data inside the m_queue array is not deleted nor reseted #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE m_count.fetch_sub(1); #endif return true; } // it failed retrieving the element off the queue. Someone else must // have read the element stored at countToIndex(currentReadIndex) // before we could perform the CAS operation } while(1); // keep looping to try again! // Something went wrong. it shouldn't be possible to reach here assert(0); // Add this return statement to avoid compiler warnings return false; } #endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__