New file |
| | |
| | | #ifndef __ARRAY_LOCK_FREE_SEM_QUEUE_H__ |
| | | #define __ARRAY_LOCK_FREE_SEM_QUEUE_H__ |
| | | #include "atomic_ops.h" |
| | | #include <assert.h> // assert() |
| | | #include <sched.h> // sched_yield() |
| | | #include "logger_factory.h" |
| | | #include "mem_pool.h" |
| | | #include "shm_allocator.h" |
| | | #include "futex_sem.h" |
| | | #include "time_util.h" |
| | | #include "bus_def.h" |
| | | |
| | | /// @brief implementation of an array based lock free queue with support for |
| | | /// multiple producers |
| | | /// This class is prevented from being instantiated directly (all members and |
| | | /// methods are private). To instantiate a multiple producers lock free queue |
| | | /// you must use the ArrayLockFreeSemQueue fachade: |
| | | /// ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q; |
| | | |
| | | |
| | | |
| | | #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator = SHM_Allocator> |
| | | class ArrayLockFreeSemQueue |
| | | { |
| | | public: |
| | | /// @brief constructor of the class |
| | | ArrayLockFreeSemQueue(size_t qsize = 16); |
| | | |
| | | virtual ~ArrayLockFreeSemQueue(); |
| | | |
| | | inline uint32_t size(); |
| | | |
| | | inline bool full(); |
| | | |
| | | inline bool empty(); |
| | | |
| | | int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | /// @brief calculate the index in the circular array that corresponds |
| | | /// to a particular "count" value |
| | | inline uint32_t countToIndex(uint32_t a_count); |
| | | |
| | | ELEM_T& operator[](unsigned i); |
| | | |
| | | public: |
| | | void *operator new(size_t size); |
| | | void operator delete(void *p); |
| | | |
| | | private: |
| | | size_t Q_SIZE; |
| | | /// @brief array to keep the elements |
| | | ELEM_T *m_theQueue; |
| | | |
| | | /// @brief where a new element will be inserted |
| | | uint32_t m_writeIndex; |
| | | |
| | | /// @brief where the next element where be extracted from |
| | | uint32_t m_readIndex; |
| | | |
| | | /// @brief maximum read index for multiple producer queues |
| | | /// If it's not the same as m_writeIndex it means |
| | | /// there are writes pending to be "committed" to the queue, that means, |
| | | /// the place for the data was reserved (the index in the array) but |
| | | /// data is still not in the queue, so the thread trying to read will have |
| | | /// to wait for those other threads to save the data into the queue |
| | | /// |
| | | /// note this is only used for multiple producers |
| | | uint32_t m_maximumReadIndex; |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | /// @brief number of elements in the queue |
| | | uint32_t m_count; |
| | | #endif |
| | | |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | | ArrayLockFreeSemQueue<ELEM_T, Allocator>(const ArrayLockFreeSemQueue<ELEM_T> &a_src); |
| | | |
| | | }; |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeSemQueue<ELEM_T, Allocator>::ArrayLockFreeSemQueue(size_t qsize): |
| | | Q_SIZE(qsize), |
| | | m_writeIndex(0), // initialisation is not atomic |
| | | m_readIndex(0), // |
| | | m_maximumReadIndex(0) // |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | ,m_count(0) // |
| | | #endif |
| | | { |
| | | m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeSemQueue<ELEM_T, Allocator>::~ArrayLockFreeSemQueue() |
| | | { |
| | | // std::cout << "destroy ArrayLockFreeSemQueue\n"; |
| | | Allocator::deallocate(m_theQueue); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) |
| | | { |
| | | // if Q_SIZE is a power of 2 this statement could be also written as |
| | | // return (a_count & (Q_SIZE - 1)); |
| | | return (a_count % Q_SIZE); |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::size() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return m_count; |
| | | #else |
| | | |
| | | uint32_t currentWriteIndex = m_maximumReadIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | |
| | | // let's think of a scenario where this function returns bogus data |
| | | // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run |
| | | // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 |
| | | // 2. afterwards this thread is preemted. While this thread is inactive 2 |
| | | // elements are inserted and removed from the queue, so m_maximumReadIndex |
| | | // is 5 and m_readIndex 4. Real size is still 1 |
| | | // 3. Now the current thread comes back from preemption and reads m_readIndex. |
| | | // currentReadIndex is 4 |
| | | // 4. currentReadIndex is bigger than currentWriteIndex, so |
| | | // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, |
| | | // it returns that the queue is almost full, when it is almost empty |
| | | // |
| | | if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) |
| | | { |
| | | return (currentWriteIndex - currentReadIndex); |
| | | } |
| | | else |
| | | { |
| | | return (Q_SIZE + currentWriteIndex - currentReadIndex); |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::full() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count == (Q_SIZE)); |
| | | #else |
| | | |
| | | uint32_t currentWriteIndex = m_writeIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | |
| | | if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::empty() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count == 0); |
| | | #else |
| | | |
| | | if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) |
| | | { |
| | | uint32_t currentReadIndex; |
| | | uint32_t currentWriteIndex; |
| | | uint32_t tmpIndex; |
| | | int s; |
| | | |
| | | // sigset_t mask_all, pre; |
| | | // sigfillset(&mask_all); |
| | | do |
| | | { |
| | | currentWriteIndex = m_writeIndex; |
| | | currentReadIndex = m_readIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | if (m_count == Q_SIZE) { |
| | | if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) |
| | | return errno; |
| | | else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | const struct timespec ts = TimeUtil::trim_time(timeout); |
| | | s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); |
| | | return errno; |
| | | } |
| | | |
| | | } else { |
| | | s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | return errno; |
| | | } |
| | | } |
| | | |
| | | } |
| | | #else |
| | | tmpIndex = (uint32_t)(currentWriteIndex - Q_SIZE + 1); |
| | | if (currentReadIndex == tmpIndex ) |
| | | { |
| | | // the queue is full |
| | | if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) |
| | | return errno; |
| | | else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | |
| | | s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, timeout, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); |
| | | return errno; |
| | | } |
| | | |
| | | } else { |
| | | s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, NULL, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | return errno; |
| | | } |
| | | } |
| | | } |
| | | #endif |
| | | |
| | | //保留写入位 |
| | | } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); |
| | | |
| | | // We know now that this index is reserved for us. Use it to save the data |
| | | m_theQueue[countToIndex(currentWriteIndex)] = a_data; |
| | | |
| | | // sigprocmask(SIG_BLOCK, &mask_all, &pre); |
| | | |
| | | // update the maximum read index after saving the data. It wouldn't fail if there is only one thread |
| | | // inserting in the queue. It might fail if there are more than 1 producer threads because this |
| | | // operation has to be done in the same order as the previous CAS |
| | | while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) |
| | | { |
| | | // this is a good place to yield the thread in case there are more |
| | | // software threads than hardware processors and you have more |
| | | // than 1 producer thread |
| | | // have a look at sched_yield (POSIX.1b) |
| | | sched_yield(); |
| | | } |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | AtomicAdd(&m_count, 1); |
| | | |
| | | if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) |
| | | err_exit(errno, "futex-FUTEX_WAKE"); |
| | | #else |
| | | if ( (s = futex((int *)&m_maximumReadIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) |
| | | err_exit(errno, "futex-FUTEX_WAKE"); |
| | | #endif |
| | | |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | int ArrayLockFreeSemQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) |
| | | { |
| | | uint32_t currentMaximumReadIndex; |
| | | uint32_t currentReadIndex; |
| | | int s; |
| | | |
| | | // sigset_t mask_all, pre; |
| | | // sigfillset(&mask_all); |
| | | |
| | | // sigprocmask(SIG_BLOCK, &mask_all, &pre); |
| | | |
| | | do |
| | | { |
| | | // to ensure thread-safety when there is more than 1 producer thread |
| | | // a second index is defined (m_maximumReadIndex) |
| | | currentReadIndex = m_readIndex; |
| | | currentMaximumReadIndex = m_maximumReadIndex; |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count == 0) { |
| | | |
| | | if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | s = futex((int *)&m_count, FUTEX_WAIT, 0, timeout, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | |
| | | } else { |
| | | s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | } |
| | | } |
| | | |
| | | #else |
| | | |
| | | if (currentReadIndex == currentMaximumReadIndex) |
| | | { |
| | | // the queue is empty or |
| | | // a producer thread has allocate space in the queue but is |
| | | // waiting to commit the data into it |
| | | if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | const struct timespec ts = TimeUtil::trim_time(timeout); |
| | | s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | |
| | | } else { |
| | | s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | } |
| | | } |
| | | #endif |
| | | |
| | | // retrieve the data from the queue |
| | | a_data = m_theQueue[countToIndex(currentReadIndex)]; |
| | | |
| | | // try to perfrom now the CAS operation on the read index. If we succeed |
| | | // a_data already contains what m_readIndex pointed to before we |
| | | // increased it |
| | | if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | // m_count.fetch_sub(1); |
| | | AtomicSub(&m_count, 1); |
| | | if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) |
| | | err_exit(errno, "futex-FUTEX_WAKE"); |
| | | #else |
| | | if ( (s = futex((int *)&m_readIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) |
| | | err_exit(errno, "futex-FUTEX_WAKE"); |
| | | #endif |
| | | |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return 0; |
| | | } |
| | | |
| | | // it failed retrieving the element off the queue. Someone else must |
| | | // have read the element stored at countToIndex(currentReadIndex) |
| | | // before we could perform the CAS operation |
| | | |
| | | } while(1); // keep looping to try again! |
| | | |
| | | // Something went wrong. it shouldn't be possible to reach here |
| | | assert(0); |
| | | |
| | | // Add this return statement to avoid compiler warnings |
| | | return -1; |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i) |
| | | { |
| | | // int currentCount = m_count; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | // if (i >= currentCount) |
| | | // { |
| | | // std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; |
| | | // std::exit(EXIT_FAILURE); |
| | | // } |
| | | return m_theQueue[countToIndex(currentReadIndex+i)]; |
| | | } |
| | | |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | void * ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator new(size_t size){ |
| | | return Allocator::allocate(size); |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | void ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator delete(void *p) { |
| | | return Allocator::deallocate(p); |
| | | } |
| | | |
| | | #endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ |