| | |
| | | #include <assert.h> // assert() |
| | | #include <sched.h> // sched_yield() |
| | | #include "logger_factory.h" |
| | | #include "mem_pool.h" |
| | | #include "shm_mm.h" |
| | | #include "shm_allocator.h" |
| | | #include "futex_sem.h" |
| | | #include "time_util.h" |
| | | |
| | | #include "bus_def.h" |
| | | |
| | | /// @brief implementation of an array based lock free queue with support for |
| | | /// multiple producers |
| | |
| | | /// you must use the ArrayLockFreeSemQueue fachade: |
| | | /// ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q; |
| | | |
| | | #define LOCK_FREE_QUEUE_TIMEOUT 1 |
| | | #define LOCK_FREE_QUEUE_NOWAIT 1 << 1 |
| | | |
| | | |
| | | #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | /// @brief number of elements in the queue |
| | | int m_count; |
| | | uint32_t m_count; |
| | | #endif |
| | | |
| | | |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) |
| | | { |
| | | uint32_t currentReadIndex; |
| | | uint32_t currentWriteIndex; |
| | | uint32_t tmpIndex; |
| | | int s; |
| | | |
| | | // sigset_t mask_all, pre; |
| | | // sigfillset(&mask_all); |
| | | do |
| | | { |
| | | currentWriteIndex = m_writeIndex; |
| | | currentReadIndex = m_readIndex; |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | if (m_count == Q_SIZE) { |
| | | if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) |
| | | return -1; |
| | | else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { |
| | | if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) |
| | | return errno; |
| | | else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | const struct timespec ts = TimeUtil::trim_time(timeout); |
| | | s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); |
| | | s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); |
| | | return -1; |
| | | return errno; |
| | | } |
| | | |
| | | } else { |
| | | s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); |
| | | s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | return -1; |
| | | return errno; |
| | | } |
| | | } |
| | | |
| | | } |
| | | #else |
| | | tmpIndex = (uint32_t)(currentWriteIndex - Q_SIZE + 1); |
| | | if (currentReadIndex == tmpIndex ) |
| | | { |
| | | // the queue is full |
| | | if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) |
| | | return errno; |
| | | else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | |
| | | s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, timeout, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); |
| | | return errno; |
| | | } |
| | | |
| | | } else { |
| | | s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, NULL, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | return errno; |
| | | } |
| | | } |
| | | } |
| | | #endif |
| | | |
| | | |
| | | //保留写入位 |
| | | } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); |
| | | |
| | | // We know now that this index is reserved for us. Use it to save the data |
| | | m_theQueue[countToIndex(currentWriteIndex)] = a_data; |
| | | |
| | | // sigprocmask(SIG_BLOCK, &mask_all, &pre); |
| | | |
| | | // update the maximum read index after saving the data. It wouldn't fail if there is only one thread |
| | | // inserting in the queue. It might fail if there are more than 1 producer threads because this |
| | | // operation has to be done in the same order as the previous CAS |
| | | |
| | | while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) |
| | | { |
| | | // this is a good place to yield the thread in case there are more |
| | |
| | | sched_yield(); |
| | | } |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | AtomicAdd(&m_count, 1); |
| | | s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); |
| | | if (s == -1) |
| | | |
| | | if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) |
| | | err_exit(errno, "futex-FUTEX_WAKE"); |
| | | #else |
| | | if ( (s = futex((int *)&m_maximumReadIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) |
| | | err_exit(errno, "futex-FUTEX_WAKE"); |
| | | #endif |
| | | |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return 0; |
| | | } |
| | | |
| | |
| | | { |
| | | uint32_t currentMaximumReadIndex; |
| | | uint32_t currentReadIndex; |
| | | |
| | | int s; |
| | | |
| | | // sigset_t mask_all, pre; |
| | | // sigfillset(&mask_all); |
| | | |
| | | // sigprocmask(SIG_BLOCK, &mask_all, &pre); |
| | | |
| | | do |
| | | { |
| | | // to ensure thread-safety when there is more than 1 producer thread |
| | | // a second index is defined (m_maximumReadIndex) |
| | | currentReadIndex = m_readIndex; |
| | | currentMaximumReadIndex = m_maximumReadIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count == 0) { |
| | | |
| | | if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) |
| | | return -1; |
| | | else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { |
| | | const struct timespec ts = TimeUtil::trim_time(timeout); |
| | | s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0); |
| | | if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | s = futex((int *)&m_count, FUTEX_WAIT, 0, timeout, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); |
| | | return -1; |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | |
| | | } else { |
| | | s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); |
| | | s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | return -1; |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | } |
| | | } |
| | | #else |
| | | if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) |
| | | |
| | | #else |
| | | |
| | | if (currentReadIndex == currentMaximumReadIndex) |
| | | { |
| | | // the queue is empty or |
| | | // a producer thread has allocate space in the queue but is |
| | | // waiting to commit the data into it |
| | | return -1; |
| | | if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | const struct timespec ts = TimeUtil::trim_time(timeout); |
| | | s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | |
| | | } else { |
| | | s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | } |
| | | } |
| | | #endif |
| | | #endif |
| | | |
| | | // retrieve the data from the queue |
| | | a_data = m_theQueue[countToIndex(currentReadIndex)]; |
| | |
| | | // increased it |
| | | if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | // m_count.fetch_sub(1); |
| | | AtomicSub(&m_count, 1); |
| | | #endif |
| | | |
| | | s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); |
| | | if (s == -1) |
| | | if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) |
| | | err_exit(errno, "futex-FUTEX_WAKE"); |
| | | #else |
| | | if ( (s = futex((int *)&m_readIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) |
| | | err_exit(errno, "futex-FUTEX_WAKE"); |
| | | #endif |
| | | |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return 0; |
| | | } |
| | | |
| | |
| | | template <typename ELEM_T, typename Allocator> |
| | | ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i) |
| | | { |
| | | int currentCount = m_count; |
| | | // int currentCount = m_count; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | if (i >= currentCount) |
| | | { |
| | | std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; |
| | | std::exit(EXIT_FAILURE); |
| | | } |
| | | // if (i >= currentCount) |
| | | // { |
| | | // std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; |
| | | // std::exit(EXIT_FAILURE); |
| | | // } |
| | | return m_theQueue[countToIndex(currentReadIndex+i)]; |
| | | } |
| | | |