#ifndef __ARRAY_LOCK_FREE_SEM_QUEUE_H__ #define __ARRAY_LOCK_FREE_SEM_QUEUE_H__ #include "atomic_ops.h" #include // assert() #include // sched_yield() #include "logger_factory.h" #include "shm_mm.h" #include "shm_allocator.h" #include "futex_sem.h" #include "time_util.h" #include "bus_def.h" /// @brief implementation of an array based lock free queue with support for /// multiple producers /// This class is prevented from being instantiated directly (all members and /// methods are private). To instantiate a multiple producers lock free queue /// you must use the ArrayLockFreeSemQueue fachade: /// ArrayLockFreeSemQueue q; #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE template class ArrayLockFreeSemQueue { public: /// @brief constructor of the class ArrayLockFreeSemQueue(size_t qsize = 16); virtual ~ArrayLockFreeSemQueue(); inline uint32_t size(); inline bool full(); inline bool empty(); int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); /// @brief calculate the index in the circular array that corresponds /// to a particular "count" value inline uint32_t countToIndex(uint32_t a_count); ELEM_T& operator[](unsigned i); public: void *operator new(size_t size); void operator delete(void *p); private: size_t Q_SIZE; /// @brief array to keep the elements ELEM_T *m_theQueue; /// @brief where a new element will be inserted uint32_t m_writeIndex; /// @brief where the next element where be extracted from uint32_t m_readIndex; /// @brief maximum read index for multiple producer queues /// If it's not the same as m_writeIndex it means /// there are writes pending to be "committed" to the queue, that means, /// the place for the data was reserved (the index in the array) but /// data is still not in the queue, so the thread trying to read will have /// to wait for those other threads to save the data into the queue /// /// note this is only used for multiple producers uint32_t m_maximumReadIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE /// @brief number of elements in the queue uint32_t m_count; #endif private: /// @brief disable copy constructor declaring it private ArrayLockFreeSemQueue(const ArrayLockFreeSemQueue &a_src); }; template ArrayLockFreeSemQueue::ArrayLockFreeSemQueue(size_t qsize): Q_SIZE(qsize), m_writeIndex(0), // initialisation is not atomic m_readIndex(0), // m_maximumReadIndex(0) // #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE ,m_count(0) // #endif { m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); } template ArrayLockFreeSemQueue::~ArrayLockFreeSemQueue() { // std::cout << "destroy ArrayLockFreeSemQueue\n"; Allocator::deallocate(m_theQueue); } template inline uint32_t ArrayLockFreeSemQueue::countToIndex(uint32_t a_count) { // if Q_SIZE is a power of 2 this statement could be also written as // return (a_count & (Q_SIZE - 1)); return (a_count % Q_SIZE); } template inline uint32_t ArrayLockFreeSemQueue::size() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE return m_count; #else uint32_t currentWriteIndex = m_maximumReadIndex; uint32_t currentReadIndex = m_readIndex; // let's think of a scenario where this function returns bogus data // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 // 2. afterwards this thread is preemted. While this thread is inactive 2 // elements are inserted and removed from the queue, so m_maximumReadIndex // is 5 and m_readIndex 4. Real size is still 1 // 3. Now the current thread comes back from preemption and reads m_readIndex. // currentReadIndex is 4 // 4. currentReadIndex is bigger than currentWriteIndex, so // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, // it returns that the queue is almost full, when it is almost empty // if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) { return (currentWriteIndex - currentReadIndex); } else { return (Q_SIZE + currentWriteIndex - currentReadIndex); } #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE } template inline bool ArrayLockFreeSemQueue::full() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE return (m_count == (Q_SIZE)); #else uint32_t currentWriteIndex = m_writeIndex; uint32_t currentReadIndex = m_readIndex; if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) { // the queue is full return true; } else { // not full! return false; } #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE } template inline bool ArrayLockFreeSemQueue::empty() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE return (m_count == 0); #else if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) { // the queue is full return true; } else { // not full! return false; } #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE } template int ArrayLockFreeSemQueue::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { uint32_t currentReadIndex; uint32_t currentWriteIndex; uint32_t tmpIndex; int s; // sigset_t mask_all, pre; // sigfillset(&mask_all); do { currentWriteIndex = m_writeIndex; currentReadIndex = m_readIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count == Q_SIZE) { if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) return errno; else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue::push futex-FUTEX_WAIT"); return errno; } } else { s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { return errno; } } } #else tmpIndex = (uint32_t)(currentWriteIndex - Q_SIZE + 1); if (currentReadIndex == tmpIndex ) { // the queue is full if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) return errno; else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, timeout, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue::push futex-FUTEX_WAIT"); return errno; } } else { s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { return errno; } } } #endif //保留写入位 } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); // We know now that this index is reserved for us. Use it to save the data m_theQueue[countToIndex(currentWriteIndex)] = a_data; // sigprocmask(SIG_BLOCK, &mask_all, &pre); // update the maximum read index after saving the data. It wouldn't fail if there is only one thread // inserting in the queue. It might fail if there are more than 1 producer threads because this // operation has to be done in the same order as the previous CAS while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { // this is a good place to yield the thread in case there are more // software threads than hardware processors and you have more // than 1 producer thread // have a look at sched_yield (POSIX.1b) sched_yield(); } #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE AtomicAdd(&m_count, 1); if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) err_exit(errno, "futex-FUTEX_WAKE"); #else if ( (s = futex((int *)&m_maximumReadIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) err_exit(errno, "futex-FUTEX_WAKE"); #endif // sigprocmask(SIG_SETMASK, &pre, NULL); return 0; } template int ArrayLockFreeSemQueue::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) { uint32_t currentMaximumReadIndex; uint32_t currentReadIndex; int s; // sigset_t mask_all, pre; // sigfillset(&mask_all); // sigprocmask(SIG_BLOCK, &mask_all, &pre); do { // to ensure thread-safety when there is more than 1 producer thread // a second index is defined (m_maximumReadIndex) currentReadIndex = m_readIndex; currentMaximumReadIndex = m_maximumReadIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count == 0) { if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { // sigprocmask(SIG_SETMASK, &pre, NULL); return errno; } else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { s = futex((int *)&m_count, FUTEX_WAIT, 0, timeout, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue::push futex-FUTEX_WAIT"); // sigprocmask(SIG_SETMASK, &pre, NULL); return errno; } } else { s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // sigprocmask(SIG_SETMASK, &pre, NULL); return errno; } } } #else if (currentReadIndex == currentMaximumReadIndex) { // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { // sigprocmask(SIG_SETMASK, &pre, NULL); return errno; } else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue::push futex-FUTEX_WAIT"); // sigprocmask(SIG_SETMASK, &pre, NULL); return errno; } } else { s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // sigprocmask(SIG_SETMASK, &pre, NULL); return errno; } } } #endif // retrieve the data from the queue a_data = m_theQueue[countToIndex(currentReadIndex)]; // try to perfrom now the CAS operation on the read index. If we succeed // a_data already contains what m_readIndex pointed to before we // increased it if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE // m_count.fetch_sub(1); AtomicSub(&m_count, 1); if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) err_exit(errno, "futex-FUTEX_WAKE"); #else if ( (s = futex((int *)&m_readIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) err_exit(errno, "futex-FUTEX_WAKE"); #endif // sigprocmask(SIG_SETMASK, &pre, NULL); return 0; } // it failed retrieving the element off the queue. Someone else must // have read the element stored at countToIndex(currentReadIndex) // before we could perform the CAS operation } while(1); // keep looping to try again! // Something went wrong. it shouldn't be possible to reach here assert(0); // Add this return statement to avoid compiler warnings return -1; } template ELEM_T& ArrayLockFreeSemQueue::operator[](unsigned int i) { // int currentCount = m_count; uint32_t currentReadIndex = m_readIndex; // if (i >= currentCount) // { // std::cerr << "ArrayLockFreeSemQueue::operator[] , Error in array limits: " << i << " is out of range\n"; // std::exit(EXIT_FAILURE); // } return m_theQueue[countToIndex(currentReadIndex+i)]; } template void * ArrayLockFreeSemQueue::operator new(size_t size){ return Allocator::allocate(size); } template void ArrayLockFreeSemQueue::operator delete(void *p) { return Allocator::deallocate(p); } #endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__