From 03816e8f937bfb5172b42e3735b757fc1547fbd0 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 25 一月 2021 18:17:39 +0800
Subject: [PATCH] update
---
src/queue/array_lock_free_sem_queue.h | 152 +++++++++++++++++++++++++++++++++++---------------
1 files changed, 107 insertions(+), 45 deletions(-)
diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h
index bb213e8..28f4d81 100644
--- a/src/queue/array_lock_free_sem_queue.h
+++ b/src/queue/array_lock_free_sem_queue.h
@@ -8,7 +8,7 @@
#include "shm_allocator.h"
#include "futex_sem.h"
#include "time_util.h"
-
+#include "bus_def.h"
/// @brief implementation of an array based lock free queue with support for
/// multiple producers
@@ -17,8 +17,7 @@
/// you must use the ArrayLockFreeSemQueue fachade:
/// ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q;
-#define LOCK_FREE_QUEUE_TIMEOUT 1
-#define LOCK_FREE_QUEUE_NOWAIT 1 << 1
+
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -75,7 +74,7 @@
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
/// @brief number of elements in the queue
- int m_count;
+ uint32_t m_count;
#endif
@@ -200,52 +199,75 @@
}
-
-
-
-
template <typename ELEM_T, typename Allocator>
int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag)
{
uint32_t currentReadIndex;
uint32_t currentWriteIndex;
+ uint32_t tmpIndex;
int s;
+ // sigset_t mask_all, pre;
+ // sigfillset(&mask_all);
do
{
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
-
+ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
if (m_count == Q_SIZE) {
- if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
- return -1;
- else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+ if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
+ return errno;
+ else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
const struct timespec ts = TimeUtil::trim_time(timeout);
- s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
+ s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
// err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
- return -1;
+ return errno;
}
} else {
- s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
+ s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
- return -1;
+ return errno;
}
}
}
+ #else
+ tmpIndex = (uint32_t)(currentWriteIndex - Q_SIZE + 1);
+ if (currentReadIndex == tmpIndex )
+ {
+ // the queue is full
+ if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
+ return errno;
+ else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+
+ s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, timeout, NULL, 0);
+ if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+ return errno;
+ }
+
+ } else {
+ s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, NULL, NULL, 0);
+ if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ return errno;
+ }
+ }
+ }
+ #endif
-
+ //淇濈暀鍐欏叆浣�
} while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
// We know now that this index is reserved for us. Use it to save the data
m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+ // sigprocmask(SIG_BLOCK, &mask_all, &pre);
+
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
// inserting in the queue. It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
-
while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
{
// this is a good place to yield the thread in case there are more
@@ -255,10 +277,17 @@
sched_yield();
}
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
AtomicAdd(&m_count, 1);
- s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
- if (s == -1)
+
+ if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1)
err_exit(errno, "futex-FUTEX_WAKE");
+#else
+ if ( (s = futex((int *)&m_maximumReadIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1)
+ err_exit(errno, "futex-FUTEX_WAKE");
+#endif
+
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
return 0;
}
@@ -268,44 +297,74 @@
{
uint32_t currentMaximumReadIndex;
uint32_t currentReadIndex;
-
int s;
+
+ // sigset_t mask_all, pre;
+ // sigfillset(&mask_all);
+
+ // sigprocmask(SIG_BLOCK, &mask_all, &pre);
+
do
{
// to ensure thread-safety when there is more than 1 producer thread
// a second index is defined (m_maximumReadIndex)
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_maximumReadIndex;
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
if (m_count == 0) {
- if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
- return -1;
- else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
- const struct timespec ts = TimeUtil::trim_time(timeout);
- s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
+ if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
+ return errno;
+ }
+ else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+ s = futex((int *)&m_count, FUTEX_WAIT, 0, timeout, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
// err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
- return -1;
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
+ return errno;
}
} else {
- s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
+ s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
- return -1;
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
+ return errno;
}
}
}
-#else
- if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
+
+ #else
+
+ if (currentReadIndex == currentMaximumReadIndex)
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
- return -1;
+ if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
+ return errno;
+ }
+ else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+ const struct timespec ts = TimeUtil::trim_time(timeout);
+ s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0);
+ if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
+ return errno;
+ }
+
+ } else {
+ s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0);
+ if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
+ return errno;
+ }
+ }
}
-#endif
+ #endif
// retrieve the data from the queue
a_data = m_theQueue[countToIndex(currentReadIndex)];
@@ -315,14 +374,17 @@
// increased it
if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
{
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
// m_count.fetch_sub(1);
AtomicSub(&m_count, 1);
-#endif
-
- s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
- if (s == -1)
+ if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1)
err_exit(errno, "futex-FUTEX_WAKE");
+ #else
+ if ( (s = futex((int *)&m_readIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1)
+ err_exit(errno, "futex-FUTEX_WAKE");
+ #endif
+
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
return 0;
}
@@ -342,13 +404,13 @@
template <typename ELEM_T, typename Allocator>
ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
- int currentCount = m_count;
+ // int currentCount = m_count;
uint32_t currentReadIndex = m_readIndex;
- if (i >= currentCount)
- {
- std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
- std::exit(EXIT_FAILURE);
- }
+ // if (i >= currentCount)
+ // {
+ // std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
+ // std::exit(EXIT_FAILURE);
+ // }
return m_theQueue[countToIndex(currentReadIndex+i)];
}
--
Gitblit v1.8.0