From f52f2c2828047c2f30d30fc1fe2b54d8db146d49 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 25 二月 2021 15:56:35 +0800 Subject: [PATCH] update --- src/queue/array_lock_free_sem_queue.h | 39 ++++++++++++++++++++------------------- 1 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h index a3b677c..cf1266d 100644 --- a/src/queue/array_lock_free_sem_queue.h +++ b/src/queue/array_lock_free_sem_queue.h @@ -4,7 +4,7 @@ #include <assert.h> // assert() #include <sched.h> // sched_yield() #include "logger_factory.h" -#include "mem_pool.h" +#include "shm_mm.h" #include "shm_allocator.h" #include "futex_sem.h" #include "time_util.h" @@ -204,6 +204,7 @@ { uint32_t currentReadIndex; uint32_t currentWriteIndex; + uint32_t tmpIndex; int s; // sigset_t mask_all, pre; @@ -215,41 +216,42 @@ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count == Q_SIZE) { if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) - return -1; + return errno; else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); - return -1; + return errno; } } else { s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { - return -1; + return errno; } } } #else - if (currentReadIndex == currentWriteIndex - Q_SIZE + 1 ) + tmpIndex = (uint32_t)(currentWriteIndex - Q_SIZE + 1); + if (currentReadIndex == tmpIndex ) { // the queue is full if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) - return -1; + return errno; else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { - const struct timespec ts = TimeUtil::trim_time(timeout); - s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0); + + s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, timeout, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); - return -1; + return errno; } } else { - s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, NULL, NULL, 0); + s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { - return -1; + return errno; } } } @@ -315,22 +317,21 @@ if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { // sigprocmask(SIG_SETMASK, &pre, NULL); - return -1; + return errno; } else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { - const struct timespec ts = TimeUtil::trim_time(timeout); - s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0); + s = futex((int *)&m_count, FUTEX_WAIT, 0, timeout, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); // sigprocmask(SIG_SETMASK, &pre, NULL); - return -1; + return errno; } } else { s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // sigprocmask(SIG_SETMASK, &pre, NULL); - return -1; + return errno; } } } @@ -344,7 +345,7 @@ // waiting to commit the data into it if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { // sigprocmask(SIG_SETMASK, &pre, NULL); - return -1; + return errno; } else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); @@ -352,14 +353,14 @@ if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); // sigprocmask(SIG_SETMASK, &pre, NULL); - return -1; + return errno; } } else { s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // sigprocmask(SIG_SETMASK, &pre, NULL); - return -1; + return errno; } } } -- Gitblit v1.8.0