From 09a82c2ece4caadad0baa0d1f3b84f1506363fdd Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 22 一月 2021 11:58:33 +0800 Subject: [PATCH] update --- src/queue/array_lock_free_sem_queue.h | 104 +++++++++++++++++++++++++++++++++++++--------------- 1 files changed, 74 insertions(+), 30 deletions(-) diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h index bb213e8..69630d9 100644 --- a/src/queue/array_lock_free_sem_queue.h +++ b/src/queue/array_lock_free_sem_queue.h @@ -75,7 +75,7 @@ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE /// @brief number of elements in the queue - int m_count; + uint32_t m_count; #endif @@ -200,10 +200,6 @@ } - - - - template <typename ELEM_T, typename Allocator> int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { @@ -215,28 +211,50 @@ { currentWriteIndex = m_writeIndex; currentReadIndex = m_readIndex; - + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count == Q_SIZE) { if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) return -1; else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); - s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); + s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); return -1; } } else { - s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); + s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { return -1; } } } + #else + if (currentReadIndex == currentWriteIndex - Q_SIZE + 1 ) + { + // the queue is full + if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + return -1; + else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + const struct timespec ts = TimeUtil::trim_time(timeout); + s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + return -1; + } + + } else { + s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + return -1; + } + } + } + #endif - + //淇濈暀鍐欏叆浣� } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); // We know now that this index is reserved for us. Use it to save the data @@ -255,10 +273,16 @@ sched_yield(); } +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE AtomicAdd(&m_count, 1); - s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); - if (s == -1) + + if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) err_exit(errno, "futex-FUTEX_WAKE"); +#else + if ( (s = futex((int *)&m_maximumReadIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) + err_exit(errno, "futex-FUTEX_WAKE"); +#endif + return 0; } @@ -268,15 +292,16 @@ { uint32_t currentMaximumReadIndex; uint32_t currentReadIndex; - int s; + do { // to ensure thread-safety when there is more than 1 producer thread // a second index is defined (m_maximumReadIndex) currentReadIndex = m_readIndex; currentMaximumReadIndex = m_maximumReadIndex; -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count == 0) { @@ -284,28 +309,45 @@ return -1; else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); - s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0); + s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); return -1; } } else { - s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); + s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { return -1; } } } -#else - if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) + + #else + + if (currentReadIndex == currentMaximumReadIndex) { // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it - return -1; + if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + return -1; + else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + const struct timespec ts = TimeUtil::trim_time(timeout); + s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + return -1; + } + + } else { + s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + return -1; + } + } } -#endif + #endif // retrieve the data from the queue a_data = m_theQueue[countToIndex(currentReadIndex)]; @@ -315,14 +357,16 @@ // increased it if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE // m_count.fetch_sub(1); AtomicSub(&m_count, 1); -#endif - - s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); - if (s == -1) + if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) err_exit(errno, "futex-FUTEX_WAKE"); + #else + if ( (s = futex((int *)&m_readIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) + err_exit(errno, "futex-FUTEX_WAKE"); + #endif + return 0; } @@ -342,13 +386,13 @@ template <typename ELEM_T, typename Allocator> ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i) { - int currentCount = m_count; + // int currentCount = m_count; uint32_t currentReadIndex = m_readIndex; - if (i >= currentCount) - { - std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); - } + // if (i >= currentCount) + // { + // std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; + // std::exit(EXIT_FAILURE); + // } return m_theQueue[countToIndex(currentReadIndex+i)]; } -- Gitblit v1.8.0