From 4c73fd7179e92bee9cccb65e46823b00f568acb3 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 22 一月 2021 16:57:34 +0800 Subject: [PATCH] tmp --- src/queue/array_lock_free_sem_queue.h | 41 +++++++++++++++++++++++++++++------------ 1 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h index 69630d9..a3b677c 100644 --- a/src/queue/array_lock_free_sem_queue.h +++ b/src/queue/array_lock_free_sem_queue.h @@ -8,7 +8,7 @@ #include "shm_allocator.h" #include "futex_sem.h" #include "time_util.h" - +#include "bus_def.h" /// @brief implementation of an array based lock free queue with support for /// multiple producers @@ -17,8 +17,7 @@ /// you must use the ArrayLockFreeSemQueue fachade: /// ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q; -#define LOCK_FREE_QUEUE_TIMEOUT 1 -#define LOCK_FREE_QUEUE_NOWAIT 1 << 1 + #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE @@ -207,15 +206,17 @@ uint32_t currentWriteIndex; int s; + // sigset_t mask_all, pre; + // sigfillset(&mask_all); do { currentWriteIndex = m_writeIndex; currentReadIndex = m_readIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count == Q_SIZE) { - if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) return -1; - else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { @@ -235,9 +236,9 @@ if (currentReadIndex == currentWriteIndex - Q_SIZE + 1 ) { // the queue is full - if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) return -1; - else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { @@ -260,10 +261,11 @@ // We know now that this index is reserved for us. Use it to save the data m_theQueue[countToIndex(currentWriteIndex)] = a_data; + // sigprocmask(SIG_BLOCK, &mask_all, &pre); + // update the maximum read index after saving the data. It wouldn't fail if there is only one thread // inserting in the queue. It might fail if there are more than 1 producer threads because this // operation has to be done in the same order as the previous CAS - while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { // this is a good place to yield the thread in case there are more @@ -283,6 +285,7 @@ err_exit(errno, "futex-FUTEX_WAKE"); #endif + // sigprocmask(SIG_SETMASK, &pre, NULL); return 0; } @@ -293,6 +296,11 @@ uint32_t currentMaximumReadIndex; uint32_t currentReadIndex; int s; + + // sigset_t mask_all, pre; + // sigfillset(&mask_all); + + // sigprocmask(SIG_BLOCK, &mask_all, &pre); do { @@ -305,19 +313,23 @@ if (m_count == 0) { - if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + // sigprocmask(SIG_SETMASK, &pre, NULL); return -1; - else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + } + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + // sigprocmask(SIG_SETMASK, &pre, NULL); return -1; } } else { s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { + // sigprocmask(SIG_SETMASK, &pre, NULL); return -1; } } @@ -330,19 +342,23 @@ // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it - if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + // sigprocmask(SIG_SETMASK, &pre, NULL); return -1; - else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + } + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + // sigprocmask(SIG_SETMASK, &pre, NULL); return -1; } } else { s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { + // sigprocmask(SIG_SETMASK, &pre, NULL); return -1; } } @@ -367,6 +383,7 @@ err_exit(errno, "futex-FUTEX_WAKE"); #endif + // sigprocmask(SIG_SETMASK, &pre, NULL); return 0; } -- Gitblit v1.8.0