From 14be935a4f8231233487d510c8db0b544bcf0f69 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 25 一月 2021 17:40:29 +0800 Subject: [PATCH] fix conflict --- src/queue/array_lock_free_sem_queue.h | 429 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 429 insertions(+), 0 deletions(-) diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h new file mode 100644 index 0000000..28f4d81 --- /dev/null +++ b/src/queue/array_lock_free_sem_queue.h @@ -0,0 +1,429 @@ +#ifndef __ARRAY_LOCK_FREE_SEM_QUEUE_H__ +#define __ARRAY_LOCK_FREE_SEM_QUEUE_H__ +#include "atomic_ops.h" +#include <assert.h> // assert() +#include <sched.h> // sched_yield() +#include "logger_factory.h" +#include "mem_pool.h" +#include "shm_allocator.h" +#include "futex_sem.h" +#include "time_util.h" +#include "bus_def.h" + +/// @brief implementation of an array based lock free queue with support for +/// multiple producers +/// This class is prevented from being instantiated directly (all members and +/// methods are private). To instantiate a multiple producers lock free queue +/// you must use the ArrayLockFreeSemQueue fachade: +/// ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q; + + + +#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + +template <typename ELEM_T, typename Allocator = SHM_Allocator> +class ArrayLockFreeSemQueue +{ +public: + /// @brief constructor of the class + ArrayLockFreeSemQueue(size_t qsize = 16); + + virtual ~ArrayLockFreeSemQueue(); + + inline uint32_t size(); + + inline bool full(); + + inline bool empty(); + + int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); + + int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); + + /// @brief calculate the index in the circular array that corresponds + /// to a particular "count" value + inline uint32_t countToIndex(uint32_t a_count); + + ELEM_T& operator[](unsigned i); + +public: + void *operator new(size_t size); + void operator delete(void *p); + +private: + size_t Q_SIZE; + /// @brief array to keep the elements + ELEM_T *m_theQueue; + + /// @brief where a new element will be inserted + uint32_t m_writeIndex; + + /// @brief where the next element where be extracted from + uint32_t m_readIndex; + + /// @brief maximum read index for multiple producer queues + /// If it's not the same as m_writeIndex it means + /// there are writes pending to be "committed" to the queue, that means, + /// the place for the data was reserved (the index in the array) but + /// data is still not in the queue, so the thread trying to read will have + /// to wait for those other threads to save the data into the queue + /// + /// note this is only used for multiple producers + uint32_t m_maximumReadIndex; + +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + /// @brief number of elements in the queue + uint32_t m_count; +#endif + + + private: + /// @brief disable copy constructor declaring it private + ArrayLockFreeSemQueue<ELEM_T, Allocator>(const ArrayLockFreeSemQueue<ELEM_T> &a_src); + +}; + + +template <typename ELEM_T, typename Allocator> +ArrayLockFreeSemQueue<ELEM_T, Allocator>::ArrayLockFreeSemQueue(size_t qsize): + Q_SIZE(qsize), + m_writeIndex(0), // initialisation is not atomic + m_readIndex(0), // + m_maximumReadIndex(0) // +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + ,m_count(0) // +#endif +{ + m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); + +} + + template <typename ELEM_T, typename Allocator> +ArrayLockFreeSemQueue<ELEM_T, Allocator>::~ArrayLockFreeSemQueue() +{ + // std::cout << "destroy ArrayLockFreeSemQueue\n"; + Allocator::deallocate(m_theQueue); + +} + +template <typename ELEM_T, typename Allocator> + inline +uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) +{ + // if Q_SIZE is a power of 2 this statement could be also written as + // return (a_count & (Q_SIZE - 1)); + return (a_count % Q_SIZE); +} + +template <typename ELEM_T, typename Allocator> + inline +uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::size() +{ +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + return m_count; +#else + + uint32_t currentWriteIndex = m_maximumReadIndex; + uint32_t currentReadIndex = m_readIndex; + + // let's think of a scenario where this function returns bogus data + // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run + // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 + // 2. afterwards this thread is preemted. While this thread is inactive 2 + // elements are inserted and removed from the queue, so m_maximumReadIndex + // is 5 and m_readIndex 4. Real size is still 1 + // 3. Now the current thread comes back from preemption and reads m_readIndex. + // currentReadIndex is 4 + // 4. currentReadIndex is bigger than currentWriteIndex, so + // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, + // it returns that the queue is almost full, when it is almost empty + // + if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) + { + return (currentWriteIndex - currentReadIndex); + } + else + { + return (Q_SIZE + currentWriteIndex - currentReadIndex); + } +#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE +} + +template <typename ELEM_T, typename Allocator> + inline +bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::full() +{ +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + return (m_count == (Q_SIZE)); +#else + + uint32_t currentWriteIndex = m_writeIndex; + uint32_t currentReadIndex = m_readIndex; + + if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) + { + // the queue is full + return true; + } + else + { + // not full! + return false; + } +#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE +} + +template <typename ELEM_T, typename Allocator> + inline +bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::empty() +{ +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + return (m_count == 0); +#else + + if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) + { + // the queue is full + return true; + } + else + { + // not full! + return false; + } +#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE +} + + + template <typename ELEM_T, typename Allocator> +int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) +{ + uint32_t currentReadIndex; + uint32_t currentWriteIndex; + uint32_t tmpIndex; + int s; + + // sigset_t mask_all, pre; + // sigfillset(&mask_all); + do + { + currentWriteIndex = m_writeIndex; + currentReadIndex = m_readIndex; + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + if (m_count == Q_SIZE) { + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) + return errno; + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + const struct timespec ts = TimeUtil::trim_time(timeout); + s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + return errno; + } + + } else { + s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + return errno; + } + } + + } + #else + tmpIndex = (uint32_t)(currentWriteIndex - Q_SIZE + 1); + if (currentReadIndex == tmpIndex ) + { + // the queue is full + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) + return errno; + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + + s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, timeout, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + return errno; + } + + } else { + s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + return errno; + } + } + } + #endif + + //淇濈暀鍐欏叆浣� + } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); + + // We know now that this index is reserved for us. Use it to save the data + m_theQueue[countToIndex(currentWriteIndex)] = a_data; + + // sigprocmask(SIG_BLOCK, &mask_all, &pre); + + // update the maximum read index after saving the data. It wouldn't fail if there is only one thread + // inserting in the queue. It might fail if there are more than 1 producer threads because this + // operation has to be done in the same order as the previous CAS + while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) + { + // this is a good place to yield the thread in case there are more + // software threads than hardware processors and you have more + // than 1 producer thread + // have a look at sched_yield (POSIX.1b) + sched_yield(); + } + +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + AtomicAdd(&m_count, 1); + + if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) + err_exit(errno, "futex-FUTEX_WAKE"); +#else + if ( (s = futex((int *)&m_maximumReadIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) + err_exit(errno, "futex-FUTEX_WAKE"); +#endif + + // sigprocmask(SIG_SETMASK, &pre, NULL); + return 0; +} + + + template <typename ELEM_T, typename Allocator> +int ArrayLockFreeSemQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) +{ + uint32_t currentMaximumReadIndex; + uint32_t currentReadIndex; + int s; + + // sigset_t mask_all, pre; + // sigfillset(&mask_all); + + // sigprocmask(SIG_BLOCK, &mask_all, &pre); + + do + { + // to ensure thread-safety when there is more than 1 producer thread + // a second index is defined (m_maximumReadIndex) + currentReadIndex = m_readIndex; + currentMaximumReadIndex = m_maximumReadIndex; + + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + if (m_count == 0) { + + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; + } + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + s = futex((int *)&m_count, FUTEX_WAIT, 0, timeout, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; + } + + } else { + s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; + } + } + } + + #else + + if (currentReadIndex == currentMaximumReadIndex) + { + // the queue is empty or + // a producer thread has allocate space in the queue but is + // waiting to commit the data into it + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; + } + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + const struct timespec ts = TimeUtil::trim_time(timeout); + s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; + } + + } else { + s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // sigprocmask(SIG_SETMASK, &pre, NULL); + return errno; + } + } + } + #endif + + // retrieve the data from the queue + a_data = m_theQueue[countToIndex(currentReadIndex)]; + + // try to perfrom now the CAS operation on the read index. If we succeed + // a_data already contains what m_readIndex pointed to before we + // increased it + if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) + { + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + // m_count.fetch_sub(1); + AtomicSub(&m_count, 1); + if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) + err_exit(errno, "futex-FUTEX_WAKE"); + #else + if ( (s = futex((int *)&m_readIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) + err_exit(errno, "futex-FUTEX_WAKE"); + #endif + + // sigprocmask(SIG_SETMASK, &pre, NULL); + return 0; + } + + // it failed retrieving the element off the queue. Someone else must + // have read the element stored at countToIndex(currentReadIndex) + // before we could perform the CAS operation + + } while(1); // keep looping to try again! + + // Something went wrong. it shouldn't be possible to reach here + assert(0); + + // Add this return statement to avoid compiler warnings + return -1; +} + + template <typename ELEM_T, typename Allocator> +ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i) +{ + // int currentCount = m_count; + uint32_t currentReadIndex = m_readIndex; + // if (i >= currentCount) + // { + // std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; + // std::exit(EXIT_FAILURE); + // } + return m_theQueue[countToIndex(currentReadIndex+i)]; +} + + + +template <typename ELEM_T, typename Allocator> +void * ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator new(size_t size){ + return Allocator::allocate(size); +} + +template <typename ELEM_T, typename Allocator> +void ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator delete(void *p) { + return Allocator::deallocate(p); +} + +#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ -- Gitblit v1.8.0