From 68d23225a38a35f1325eb39fa4ed5a005d5de473 Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@aiot.com> Date: 星期三, 11 八月 2021 09:50:20 +0800 Subject: [PATCH] fix from 3.1 first commit --- src/queue/array_lock_free_sem_queue.h | 154 ++++++++++++++++++++++++++++++++++++--------------- 1 files changed, 108 insertions(+), 46 deletions(-) diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h index bb213e8..cf1266d 100644 --- a/src/queue/array_lock_free_sem_queue.h +++ b/src/queue/array_lock_free_sem_queue.h @@ -4,11 +4,11 @@ #include <assert.h> // assert() #include <sched.h> // sched_yield() #include "logger_factory.h" -#include "mem_pool.h" +#include "shm_mm.h" #include "shm_allocator.h" #include "futex_sem.h" #include "time_util.h" - +#include "bus_def.h" /// @brief implementation of an array based lock free queue with support for /// multiple producers @@ -17,8 +17,7 @@ /// you must use the ArrayLockFreeSemQueue fachade: /// ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q; -#define LOCK_FREE_QUEUE_TIMEOUT 1 -#define LOCK_FREE_QUEUE_NOWAIT 1 << 1 + #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE @@ -75,7 +74,7 @@ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE /// @brief number of elements in the queue - int m_count; + uint32_t m_count; #endif @@ -200,52 +199,75 @@ } - - - - template <typename ELEM_T, typename Allocator> int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { uint32_t currentReadIndex; uint32_t currentWriteIndex; + uint32_t tmpIndex; int s; + // sigset_t mask_all, pre; + // sigfillset(&mask_all); do { currentWriteIndex = m_writeIndex; currentReadIndex = m_readIndex; - + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count == Q_SIZE) { - if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) - return -1; - else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) + return errno; + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); - s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); + s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); - return -1; + return errno; } } else { - s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); + s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { - return -1; + return errno; } } } + #else + tmpIndex = (uint32_t)(currentWriteIndex - Q_SIZE + 1); + if (currentReadIndex == tmpIndex ) + { + // the queue is full + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) + return errno; + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + + s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, timeout, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + return errno; + } + + } else { + s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + return errno; + } + } + } + #endif - + //淇濈暀鍐欏叆浣� } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); // We know now that this index is reserved for us. Use it to save the data m_theQueue[countToIndex(currentWriteIndex)] = a_data; + // sigprocmask(SIG_BLOCK, &mask_all, &pre); + // update the maximum read index after saving the data. It wouldn't fail if there is only one thread // inserting in the queue. It might fail if there are more than 1 producer threads because this // operation has to be done in the same order as the previous CAS - while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { // this is a good place to yield the thread in case there are more @@ -255,10 +277,17 @@ sched_yield(); } +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE AtomicAdd(&m_count, 1); - s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); - if (s == -1) + + if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) err_exit(errno, "futex-FUTEX_WAKE"); +#else + if ( (s = futex((int *)&m_maximumReadIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) + err_exit(errno, "futex-FUTEX_WAKE"); +#endif + + // sigprocmask(SIG_SETMASK, &pre, NULL); return 0; } @@ -268,44 +297,74 @@ { uint32_t currentMaximumReadIndex; uint32_t currentReadIndex; - int s; + + // sigset_t mask_all, pre; + // sigfillset(&mask_all); + + // sigprocmask(SIG_BLOCK, &mask_all, &pre); + do { // to ensure thread-safety when there is more than 1 producer thread // a second index is defined (m_maximumReadIndex) currentReadIndex = m_readIndex; currentMaximumReadIndex = m_maximumReadIndex; -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count == 0) { - if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) - return -1; - else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { - const struct timespec ts = TimeUtil::trim_time(timeout); - s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0); + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; + } + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + s = futex((int *)&m_count, FUTEX_WAIT, 0, timeout, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); - return -1; + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; } } else { - s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); + s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { - return -1; + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; } } } -#else - if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) + + #else + + if (currentReadIndex == currentMaximumReadIndex) { // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it - return -1; + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; + } + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + const struct timespec ts = TimeUtil::trim_time(timeout); + s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; + } + + } else { + s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; + } + } } -#endif + #endif // retrieve the data from the queue a_data = m_theQueue[countToIndex(currentReadIndex)]; @@ -315,14 +374,17 @@ // increased it if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE // m_count.fetch_sub(1); AtomicSub(&m_count, 1); -#endif - - s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); - if (s == -1) + if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) err_exit(errno, "futex-FUTEX_WAKE"); + #else + if ( (s = futex((int *)&m_readIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) + err_exit(errno, "futex-FUTEX_WAKE"); + #endif + + // sigprocmask(SIG_SETMASK, &pre, NULL); return 0; } @@ -342,13 +404,13 @@ template <typename ELEM_T, typename Allocator> ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i) { - int currentCount = m_count; + // int currentCount = m_count; uint32_t currentReadIndex = m_readIndex; - if (i >= currentCount) - { - std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); - } + // if (i >= currentCount) + // { + // std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; + // std::exit(EXIT_FAILURE); + // } return m_theQueue[countToIndex(currentReadIndex+i)]; } -- Gitblit v1.8.0