From 082633f08aae8eea19bd7050cbe4a75e5ed1ac6f Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期二, 07 七月 2020 12:07:29 +0800 Subject: [PATCH] update --- squeue/include/queue_factory.h | 34 -- /dev/null | 63 ----- test/test_queue | 0 squeue/include/array_lock_free_queue.h | 91 ++++++ squeue/include/lock_free_queue.h | 265 ++++++++++++++++----- test/test.h | 3 squeue/mm.c | 2 squeue/include/linked_lock_free_queue.h | 247 ++++++++++++++++++++ 8 files changed, 534 insertions(+), 171 deletions(-) diff --git a/squeue/include/SAbstractQueue.h b/squeue/include/SAbstractQueue.h deleted file mode 100644 index a72439f..0000000 --- a/squeue/include/SAbstractQueue.h +++ /dev/null @@ -1,22 +0,0 @@ -// queue.h -- interface for a queue -#ifndef SAbstractQueue_H_ -#define SAbstractQueue_H_ - -template <typename T> -class SAbstractQueue -{ - public: - SAbstractQueue() {} - virtual unsigned int size() const = 0; - virtual bool add(const T &item) = 0;// add item to end - virtual bool add_nowait(const T &item) = 0; - virtual bool add_timeout(const T &item, struct timespec *timeout) = 0; - virtual bool remove(T &item) = 0; - virtual bool remove_nowait(T &item) = 0; - virtual bool remove_timeout(T &item, struct timespec * timeout) = 0; - - - virtual T& operator[](unsigned i) = 0; - virtual ~SAbstractQueue() {} -}; -#endif diff --git a/squeue/include/SLinkedLockFreeQueue.h b/squeue/include/SLinkedLockFreeQueue.h deleted file mode 100644 index 8225920..0000000 --- a/squeue/include/SLinkedLockFreeQueue.h +++ /dev/null @@ -1,370 +0,0 @@ -// queue.h -- interface for a queue -#ifndef SLinkedLockFreeQueue_H_ -#define SLinkedLockFreeQueue_H_ -#include "mm.h" -#include "sem_util.h" -#include "SAbstractQueue.h" - - -template <typename T> class Node; - -template <typename T> -class Pointer { -public: - - Node<T> *ptr; - unsigned long count; - Pointer( Node<T> *node = NULL, int c=0) noexcept : ptr(node), count(c) {} - - bool operator == (const Pointer<T> o) const { - return (o.ptr == ptr) && (o.count == count); - } - bool operator != (const Pointer<T> o) const { - return !((o.ptr == ptr) && (o.count == count)); - } - - - -}; - -template <typename T> -class Node { -public: - alignas(16) std::atomic<Pointer<T> > next; - T value; - - Node() { - } - - void *operator new(size_t size){ - return mm_malloc(size); - } - - void operator delete(void *p) { - return mm_free(p); - } -}; - - - - - -template <typename T> -class SLinkedLockFreeQueue -{ -private: -// class scope definitions - enum {Q_SIZE = 10}; - - int slots; - int items; -// private class members - std::atomic<Pointer<T> > Head; // pointer to front of Queue - std::atomic<Pointer<T> > Tail; // pointer to rear of Queue - //std::atomic_uint count; // current number of size in Queue - std::atomic_uint count; - const size_t qsize; // maximum number of size in Queue - // preemptive definitions to prevent public copying - SLinkedLockFreeQueue(const SLinkedLockFreeQueue & q) : qsize(0) { } - SLinkedLockFreeQueue & operator=(const SLinkedLockFreeQueue & q) { return *this;} - bool _add(const T &item); // add item to end - bool _remove(T &item); // remove item from front -public: - SLinkedLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit - ~SLinkedLockFreeQueue(); - bool isempty() const; - bool isfull() const; - unsigned int size() const; - bool add(const T &item); // add item to end - bool add_nowait(const T &item); - bool add_timeout(const T &item, struct timespec *timeout); - bool remove(T &item); - bool remove_nowait(T &item); - bool remove_timeout(T &item, struct timespec * timeout); - - - T& operator[](unsigned i); - - void *operator new(size_t size); - void operator delete(void *p); -}; - - - - -// Queue methods -template <typename T> -SLinkedLockFreeQueue<T>::SLinkedLockFreeQueue(size_t qs) : count(0), qsize(qs) -{ - Node<T> *node = new Node<T>; - Pointer<T> pointer(node, 0); - - Head.store(pointer, std::memory_order_relaxed); - Tail.store(pointer, std::memory_order_relaxed); - - slots = SemUtil::get(IPC_PRIVATE, qsize); - items = SemUtil::get(IPC_PRIVATE, 0); - -} - -template <typename T> -SLinkedLockFreeQueue<T>::~SLinkedLockFreeQueue() -{ - std::cerr << "SLinkedLockFreeQueue destory" << std::endl; - SemUtil::remove(slots); - SemUtil::remove(items); - - - Node<T> * nodeptr; - Pointer<T> tmp = Head.load(std::memory_order_relaxed); - while((nodeptr = tmp.ptr) != NULL) { - tmp = (tmp.ptr->next).load(std::memory_order_relaxed); - //std::cerr << "delete " << nodeptr << std::endl; - delete nodeptr; - - } - -} - -template <typename T> -bool SLinkedLockFreeQueue<T>::isempty() const -{ - return count == 0; -} - -template <typename T> -bool SLinkedLockFreeQueue<T>::isfull() const -{ - return count == qsize; -} - -template <typename T> -unsigned int SLinkedLockFreeQueue<T>::size() const -{ - return count; -} - -// Add item to queue -template <typename T> -bool SLinkedLockFreeQueue<T>::_add(const T & item) -{ - if (isfull()) - return false; - - Node<T> * node = new Node<T>; - node->value = item; - - - Pointer<T> tail ; - Pointer<T> next ; - - - while(true) { - tail = Tail.load(std::memory_order_relaxed); - next = (tail.ptr->next).load(std::memory_order_relaxed); - if (tail == Tail.load(std::memory_order_relaxed)) { - if (next.ptr == NULL) { - if ((tail.ptr->next).compare_exchange_weak(next, - Pointer<T>(node, next.count+1), - std::memory_order_release, - std::memory_order_relaxed) ) - break; - else - Tail.compare_exchange_weak(tail, - Pointer<T>(next.ptr, tail.count+1), - std::memory_order_release, - std::memory_order_relaxed); - } - - } - } - - Tail.compare_exchange_weak(tail, Pointer<T>(node, tail.count+1), - std::memory_order_release, - std::memory_order_relaxed); - count++; - return true; -} - -template <typename T> -bool SLinkedLockFreeQueue<T>::add(const T & item) -{ - if (SemUtil::dec(slots) == -1) { - err_exit(errno, "add"); - } - - if (SLinkedLockFreeQueue<T>::_add(item)) { - SemUtil::inc(items); - return true; - } - return false; - -} - -template <typename T> -bool SLinkedLockFreeQueue<T>::add_nowait(const T & item) -{ - if (SemUtil::dec_nowait(slots) == -1) { - if (errno == EAGAIN) - return false; - else - err_exit(errno, "add_nowait"); - } - - if (SLinkedLockFreeQueue<T>::_add(item)) { - SemUtil::inc(items); - return true; - } - return false; - -} - -template <typename T> -bool SLinkedLockFreeQueue<T>::add_timeout(const T & item, struct timespec * timeout) -{ - if (SemUtil::dec_timeout(slots, timeout) == -1) { - if (errno == EAGAIN) - return false; - else - err_exit(errno, "add_timeout"); - } - - if (SLinkedLockFreeQueue<T>::_add(item)){ - SemUtil::inc(items); - return true; - } - return false; - -} - - -// Place front item into item variable and remove from queue -template <typename T> -bool SLinkedLockFreeQueue<T>::_remove(T & item) -{ - if (isempty()) - return false; - - Pointer<T> head; - Pointer<T> tail; - Pointer<T> next; - - while(true) { - head = Head.load(std::memory_order_relaxed); - tail = Tail.load(std::memory_order_relaxed); - next = (head.ptr->next).load(); - if (head == Head.load(std::memory_order_relaxed)) { - if(head.ptr == tail.ptr) { - if (next.ptr == NULL) - return false; - // Tail is falling behind. Try to advance it - Tail.compare_exchange_weak(tail, - Pointer<T>(next.ptr, tail.count+1), - std::memory_order_release, - std::memory_order_relaxed); - } else { - item = next.ptr->value; - if (Head.compare_exchange_weak(head, - Pointer<T>(next.ptr, head.count+1), - std::memory_order_release, - std::memory_order_relaxed)) { - delete head.ptr; - break; - } - - } - } - - } - - count--; - return true; - -} - -template <typename T> -bool SLinkedLockFreeQueue<T>::remove(T & item) -{ - if (SemUtil::dec(items) == -1) { - err_exit(errno, "remove"); - } - - if (SLinkedLockFreeQueue<T>::_remove(item)) { - SemUtil::inc(slots); - return true; - } - return false; - -} - -template <typename T> -bool SLinkedLockFreeQueue<T>::remove_nowait(T & item) -{ - if (SemUtil::dec_nowait(items) == -1) { - if (errno == EAGAIN) - return false; - else - err_exit(errno, "remove_nowait"); - } - - if (SLinkedLockFreeQueue<T>::_remove(item)) { - SemUtil::inc(slots); - return true; - } - return false; - -} - -template <typename T> -bool SLinkedLockFreeQueue<T>::remove_timeout(T & item, struct timespec * timeout) -{ - if (SemUtil::dec_timeout(items, timeout) == -1) { - if (errno == EAGAIN) - return false; - else - err_exit(errno, "remove_timeout"); - } - - if (SLinkedLockFreeQueue<T>::_remove(item)) { - SemUtil::inc(slots); - return true; - } - return false; - -} - - -template <class T> -T& SLinkedLockFreeQueue<T>::operator[](unsigned int i) -{ - if (i < 0 || i >= count) - { - std::cerr << "Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); - } - - - Pointer<T> tmp = Head.load(std::memory_order_relaxed); - //Pointer<T> tail = Tail.load(std::memory_order_relaxed); - - while(i > 0) { - //std::cout << i << ":" << std::endl; - tmp = (tmp.ptr->next).load(std::memory_order_relaxed); - i--; - } - - tmp = (tmp.ptr->next).load(std::memory_order_relaxed); - return tmp.ptr->value; -} - -template <class T> -void * SLinkedLockFreeQueue<T>::operator new(size_t size){ - return mm_malloc(size); -} - -template <class T> -void SLinkedLockFreeQueue<T>::operator delete(void *p) { - return mm_free(p); -} - -#endif diff --git a/squeue/include/lock_free_queue_impl_multiple_producer.h b/squeue/include/array_lock_free_queue.h similarity index 71% rename from squeue/include/lock_free_queue_impl_multiple_producer.h rename to squeue/include/array_lock_free_queue.h index 959cca8..8bcf579 100644 --- a/squeue/include/lock_free_queue_impl_multiple_producer.h +++ b/squeue/include/array_lock_free_queue.h @@ -4,11 +4,82 @@ #include <assert.h> // assert() #include <sched.h> // sched_yield() + +/// @brief implementation of an array based lock free queue with support for +/// multiple producers +/// This class is prevented from being instantiated directly (all members and +/// methods are private). To instantiate a multiple producers lock free queue +/// you must use the ArrayLockFreeQueue fachade: +/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; template <typename ELEM_T> -int ArrayLockFreeQueueMultipleProducers<ELEM_T>::m_reference = 0; +class ArrayLockFreeQueue +{ + // ArrayLockFreeQueue will be using this' private members + template < + typename ELEM_T_, + template <typename T> class Q_TYPE > + friend class LockFreeQueue; + +private: + /// @brief constructor of the class + ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); + + virtual ~ArrayLockFreeQueue(); + + inline uint32_t size(); + + inline bool full(); + + inline bool empty(); + + bool push(const ELEM_T &a_data); + + bool pop(ELEM_T &a_data); + + /// @brief calculate the index in the circular array that corresponds + /// to a particular "count" value + inline uint32_t countToIndex(uint32_t a_count); + +private: + size_t Q_SIZE; + /// @brief array to keep the elements + ELEM_T *m_theQueue; + + /// @brief where a new element will be inserted + std::atomic<uint32_t> m_writeIndex; + + /// @brief where the next element where be extracted from + std::atomic<uint32_t> m_readIndex; + + /// @brief maximum read index for multiple producer queues + /// If it's not the same as m_writeIndex it means + /// there are writes pending to be "committed" to the queue, that means, + /// the place for the data was reserved (the index in the array) but + /// data is still not in the queue, so the thread trying to read will have + /// to wait for those other threads to save the data into the queue + /// + /// note this is only used for multiple producers + std::atomic<uint32_t> m_maximumReadIndex; + +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + /// @brief number of elements in the queue + std::atomic<uint32_t> m_count; +#endif + static int m_reference; + +private: + /// @brief disable copy constructor declaring it private + ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src); + + +}; + template <typename ELEM_T> -ArrayLockFreeQueueMultipleProducers<ELEM_T>::ArrayLockFreeQueueMultipleProducers(size_t qsize): +int ArrayLockFreeQueue<ELEM_T>::m_reference = 0; + +template <typename ELEM_T> +ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize): Q_SIZE(qsize), m_writeIndex(0), // initialisation is not atomic m_readIndex(0), // @@ -23,9 +94,9 @@ } template <typename ELEM_T> -ArrayLockFreeQueueMultipleProducers<ELEM_T>::~ArrayLockFreeQueueMultipleProducers() +ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue() { - std::cout << "destroy ArrayLockFreeQueueMultipleProducers\n"; + std::cout << "destroy ArrayLockFreeQueue\n"; m_reference--; if(m_reference == 0) { mm_free(m_theQueue); @@ -35,7 +106,7 @@ template <typename ELEM_T> inline -uint32_t ArrayLockFreeQueueMultipleProducers<ELEM_T>::countToIndex(uint32_t a_count) +uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count) { // if Q_SIZE is a power of 2 this statement could be also written as // return (a_count & (Q_SIZE - 1)); @@ -44,7 +115,7 @@ template <typename ELEM_T> inline -uint32_t ArrayLockFreeQueueMultipleProducers<ELEM_T>::size() +uint32_t ArrayLockFreeQueue<ELEM_T>::size() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE @@ -79,7 +150,7 @@ template <typename ELEM_T> inline -bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::full() +bool ArrayLockFreeQueue<ELEM_T>::full() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE @@ -104,7 +175,7 @@ template <typename ELEM_T> inline -bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::empty() +bool ArrayLockFreeQueue<ELEM_T>::empty() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE @@ -126,7 +197,7 @@ template <typename ELEM_T> -bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::push(const ELEM_T &a_data) +bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data) { uint32_t currentWriteIndex; @@ -187,7 +258,7 @@ } template <typename ELEM_T> -bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::pop(ELEM_T &a_data) +bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data) { uint32_t currentReadIndex; diff --git a/squeue/include/linked_lock_free_queue.h b/squeue/include/linked_lock_free_queue.h new file mode 100644 index 0000000..58cdc00 --- /dev/null +++ b/squeue/include/linked_lock_free_queue.h @@ -0,0 +1,247 @@ +// queue.h -- interface for a queue +#ifndef __LINKED_LOCK_FREE_QUEUE_H_ +#define __LINKED_LOCK_FREE_QUEUE_H_ +#include "mm.h" +#include "sem_util.h" + +template <typename T> class Node; + +template <typename T> +class Pointer { +public: + + Node<T> *ptr; + unsigned long count; + Pointer( Node<T> *node = NULL, int c=0) noexcept : ptr(node), count(c) {} + + bool operator == (const Pointer<T> o) const { + return (o.ptr == ptr) && (o.count == count); + } + bool operator != (const Pointer<T> o) const { + return !((o.ptr == ptr) && (o.count == count)); + } + + + +}; + +template <typename T> +class Node { +public: + alignas(16) std::atomic<Pointer<T> > next; + T value; + + Node() { + } + + void *operator new(size_t size){ + return mm_malloc(size); + } + + void operator delete(void *p) { + return mm_free(p); + } +}; + + + + + +template <typename T> +class LinkedLockFreeQueue +{ +private: +// class scope definitions + enum {Q_SIZE = 10}; + + +// private class members + std::atomic<Pointer<T> > Head; // pointer to front of Queue + std::atomic<Pointer<T> > Tail; // pointer to rear of Queue + //std::atomic_uint count; // current number of size in Queue + std::atomic_uint count; + const size_t qsize; // maximum number of size in Queue + // preemptive definitions to prevent public copying + LinkedLockFreeQueue(const LinkedLockFreeQueue & q) : qsize(0) { } + LinkedLockFreeQueue & operator=(const LinkedLockFreeQueue & q) { return *this;} +public: + LinkedLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit + ~LinkedLockFreeQueue(); + bool empty() const; + bool full() const; + unsigned int size() const; + bool push(const T &item); // add item to end + bool pop(T &item); + + + T& operator[](unsigned i); + +}; + + + + +// Queue methods +template <typename T> +LinkedLockFreeQueue<T>::LinkedLockFreeQueue(size_t qs) : count(0), qsize(qs) +{ + Node<T> *node = new Node<T>; + Pointer<T> pointer(node, 0); + + Head.store(pointer, std::memory_order_relaxed); + Tail.store(pointer, std::memory_order_relaxed); + + + +} + +template <typename T> +LinkedLockFreeQueue<T>::~LinkedLockFreeQueue() +{ + std::cerr << "LinkedLockFreeQueue destory" << std::endl; + + Node<T> * nodeptr; + Pointer<T> tmp = Head.load(std::memory_order_relaxed); + while((nodeptr = tmp.ptr) != NULL) { + tmp = (tmp.ptr->next).load(std::memory_order_relaxed); + //std::cerr << "delete " << nodeptr << std::endl; + delete nodeptr; + + } + +} + +template <typename T> +bool LinkedLockFreeQueue<T>::empty() const +{ + return count == 0; +} + +template <typename T> +bool LinkedLockFreeQueue<T>::full() const +{ + return count == qsize; +} + +template <typename T> +unsigned int LinkedLockFreeQueue<T>::size() const +{ + return count; +} + +// Add item to queue +template <typename T> +bool LinkedLockFreeQueue<T>::push(const T & item) +{ + if (full()) + return false; + + Node<T> * node = new Node<T>; + node->value = item; + + + Pointer<T> tail ; + Pointer<T> next ; + + + while(true) { + tail = Tail.load(std::memory_order_relaxed); + next = (tail.ptr->next).load(std::memory_order_relaxed); + if (tail == Tail.load(std::memory_order_relaxed)) { + if (next.ptr == NULL) { + if ((tail.ptr->next).compare_exchange_weak(next, + Pointer<T>(node, next.count+1), + std::memory_order_release, + std::memory_order_relaxed) ) + break; + else + Tail.compare_exchange_weak(tail, + Pointer<T>(next.ptr, tail.count+1), + std::memory_order_release, + std::memory_order_relaxed); + } + + } + } + + Tail.compare_exchange_weak(tail, Pointer<T>(node, tail.count+1), + std::memory_order_release, + std::memory_order_relaxed); + count++; + return true; +} + + + + +// Place front item into item variable and remove from queue +template <typename T> +bool LinkedLockFreeQueue<T>::pop(T & item) +{ + if (empty()) + return false; + + Pointer<T> head; + Pointer<T> tail; + Pointer<T> next; + + while(true) { + head = Head.load(std::memory_order_relaxed); + tail = Tail.load(std::memory_order_relaxed); + next = (head.ptr->next).load(); + if (head == Head.load(std::memory_order_relaxed)) { + if(head.ptr == tail.ptr) { + if (next.ptr == NULL) + return false; + // Tail is falling behind. Try to advance it + Tail.compare_exchange_weak(tail, + Pointer<T>(next.ptr, tail.count+1), + std::memory_order_release, + std::memory_order_relaxed); + } else { + item = next.ptr->value; + if (Head.compare_exchange_weak(head, + Pointer<T>(next.ptr, head.count+1), + std::memory_order_release, + std::memory_order_relaxed)) { + delete head.ptr; + break; + } + + } + } + + } + + count--; + return true; + +} + + +template <class T> +T& LinkedLockFreeQueue<T>::operator[](unsigned int i) +{ + if (i < 0 || i >= count) + { + std::cerr << "Error in array limits: " << i << " is out of range\n"; + std::exit(EXIT_FAILURE); + } + + + Pointer<T> tmp = Head.load(std::memory_order_relaxed); + //Pointer<T> tail = Tail.load(std::memory_order_relaxed); + + while(i > 0) { + //std::cout << i << ":" << std::endl; + tmp = (tmp.ptr->next).load(std::memory_order_relaxed); + i--; + } + + tmp = (tmp.ptr->next).load(std::memory_order_relaxed); + return tmp.ptr->value; +} + + + +#endif diff --git a/squeue/include/lock_free_queue.h b/squeue/include/lock_free_queue.h index a8f7801..c8a672c 100644 --- a/squeue/include/lock_free_queue.h +++ b/squeue/include/lock_free_queue.h @@ -4,9 +4,12 @@ #include <stdint.h> // uint32_t #include <atomic> #include <usg_common.h> +#include <assert.h> // assert() +#include "mm.h" +#include "sem_util.h" // default Queue size -#define LOCK_FREE_Q_DEFAULT_SIZE 65536 // (2^16) +#define LOCK_FREE_Q_DEFAULT_SIZE 16 // define this macro if calls to "size" must return the real size of the // queue. If it is undefined that function will try to take a snapshot of @@ -17,7 +20,7 @@ // template <typename ELEM_T> -class ArrayLockFreeQueueMultipleProducers; +class ArrayLockFreeQueue; /// @brief Lock-free queue based on a circular array @@ -32,7 +35,7 @@ /// ArrayLockFreeQueue<int, 16> q; /// // queue of ints of size (16 - 1) and /// // defaulted to single producer -/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q; +/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; /// // queue of ints of size (100 - 1) with support /// // for multiple producers /// @@ -57,13 +60,16 @@ /// last 4 elements of the queue are not used when the counter rolls /// over to 0 /// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and -/// ArrayLockFreeQueueMultipleProducers are supported (single producer +/// ArrayLockFreeQueue are supported (single producer /// by default) template < typename ELEM_T, - template <typename T> class Q_TYPE = ArrayLockFreeQueueMultipleProducers > + template <typename T> class Q_TYPE = ArrayLockFreeQueue > class LockFreeQueue { +private: + int slots; + int items; public: /// @brief constructor of the class LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); @@ -88,7 +94,7 @@ /// environments this function might return bogus values. See help in method /// LockFreeQueue::size inline bool full(); - + inline bool empty(); /// @brief push an element at the tail of the queue @@ -97,13 +103,21 @@ /// structures to be inserted in the queue you should think of instantiate the template /// of the queue as a pointer to that large structure /// @return true if the element was inserted in the queue. False if the queue was full - inline bool push(const ELEM_T &a_data); + bool push(const ELEM_T &a_data); + bool push_nowait(const ELEM_T &a_data); + bool push_timeout(const ELEM_T &a_data, struct timespec * timeout); /// @brief pop the element at the head of the queue /// @param a reference where the element in the head of the queue will be saved to /// Note that the a_data parameter might contain rubbish if the function returns false /// @return true if the element was successfully extracted from the queue. False if the queue was empty - inline bool pop(ELEM_T &a_data); + bool pop(ELEM_T &a_data); + bool pop_nowait(ELEM_T &a_data); + bool pop_timeout(ELEM_T &a_data, struct timespec * timeout); + + + void *operator new(size_t size); + void operator delete(void *p); protected: /// @brief the actual queue. methods are forwarded into the real @@ -117,77 +131,186 @@ -/// @brief implementation of an array based lock free queue with support for -/// multiple producers -/// This class is prevented from being instantiated directly (all members and -/// methods are private). To instantiate a multiple producers lock free queue -/// you must use the ArrayLockFreeQueue fachade: -/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q; -template <typename ELEM_T> -class ArrayLockFreeQueueMultipleProducers +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize): + m_qImpl(qsize) { - // ArrayLockFreeQueue will be using this' private members - template < - typename ELEM_T_, - template <typename T> class Q_TYPE > - friend class LockFreeQueue; + slots = SemUtil::get(IPC_PRIVATE, qsize); + items = SemUtil::get(IPC_PRIVATE, 0); +} -private: - /// @brief constructor of the class - ArrayLockFreeQueueMultipleProducers(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - virtual ~ArrayLockFreeQueueMultipleProducers(); - - inline uint32_t size(); - - inline bool full(); +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue() +{ + SemUtil::remove(slots); + SemUtil::remove(items); +} - inline bool empty(); - - bool push(const ELEM_T &a_data); - - bool pop(ELEM_T &a_data); - - /// @brief calculate the index in the circular array that corresponds - /// to a particular "count" value - inline uint32_t countToIndex(uint32_t a_count); - -private: - size_t Q_SIZE; - /// @brief array to keep the elements - ELEM_T *m_theQueue; +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +inline uint32_t LockFreeQueue<ELEM_T, Q_TYPE>::size() +{ + return m_qImpl.size(); +} - /// @brief where a new element will be inserted - std::atomic<uint32_t> m_writeIndex; +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Q_TYPE>::full() +{ + return m_qImpl.full(); +} - /// @brief where the next element where be extracted from - std::atomic<uint32_t> m_readIndex; - - /// @brief maximum read index for multiple producer queues - /// If it's not the same as m_writeIndex it means - /// there are writes pending to be "committed" to the queue, that means, - /// the place for the data was reserved (the index in the array) but - /// data is still not in the queue, so the thread trying to read will have - /// to wait for those other threads to save the data into the queue - /// - /// note this is only used for multiple producers - std::atomic<uint32_t> m_maximumReadIndex; - -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - /// @brief number of elements in the queue - std::atomic<uint32_t> m_count; -#endif - static int m_reference; - -private: - /// @brief disable copy constructor declaring it private - ArrayLockFreeQueueMultipleProducers<ELEM_T>(const ArrayLockFreeQueueMultipleProducers<ELEM_T> &a_src); +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Q_TYPE>::empty() +{ + return m_qImpl.empty(); +} -}; +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +bool LockFreeQueue<ELEM_T, Q_TYPE>::push(const ELEM_T &a_data) +{ + if (SemUtil::dec(slots) == -1) { + err_exit(errno, "push"); + } + + if ( m_qImpl.push(a_data) ) { + SemUtil::inc(items); + return true; + } + return false; + +} + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +bool LockFreeQueue<ELEM_T, Q_TYPE>::push_nowait(const ELEM_T &a_data) +{ + if (SemUtil::dec_nowait(slots) == -1) { + if (errno == EAGAIN) + return false; + else + err_exit(errno, "push_nowait"); + } + + if ( m_qImpl.push(a_data)) { + SemUtil::inc(items); + return true; + } + return false; + +} + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +bool LockFreeQueue<ELEM_T, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout) +{ + + if (SemUtil::dec_timeout(slots, timeout) == -1) { + if (errno == EAGAIN) + return false; + else + err_exit(errno, "push_timeout"); + } + + if (m_qImpl.push(a_data)){ + SemUtil::inc(items); + return true; + } + return false; + +} + + + + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data) +{ + if (SemUtil::dec(items) == -1) { + err_exit(errno, "remove"); + } + + if (m_qImpl.pop(a_data)) { + SemUtil::inc(slots); + return true; + } + return false; + +} + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_nowait(ELEM_T &a_data) +{ + if (SemUtil::dec_nowait(items) == -1) { + if (errno == EAGAIN) + return false; + else + err_exit(errno, "remove_nowait"); + } + + if (m_qImpl.pop(a_data)) { + SemUtil::inc(slots); + return true; + } + return false; + +} + + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) +{ + if (SemUtil::dec_timeout(items, timeout) == -1) { + if (errno == EAGAIN) + return false; + else + err_exit(errno, "remove_timeout"); + } + + if (m_qImpl.pop(a_data)) { + SemUtil::inc(slots); + return true; + } + return false; + +} + + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +void * LockFreeQueue<ELEM_T, Q_TYPE>::operator new(size_t size){ + return mm_malloc(size); +} + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +void LockFreeQueue<ELEM_T, Q_TYPE>::operator delete(void *p) { + return mm_free(p); +} // include implementation files -#include "lock_free_queue_impl.h" -#include "lock_free_queue_impl_multiple_producer.h" +#include "linked_lock_free_queue.h" +#include "array_lock_free_queue.h" #endif // _LOCK_FREE_QUEUE_H__ diff --git a/squeue/include/lock_free_queue_impl.h b/squeue/include/lock_free_queue_impl.h deleted file mode 100644 index aa182a5..0000000 --- a/squeue/include/lock_free_queue_impl.h +++ /dev/null @@ -1,63 +0,0 @@ -#ifndef __LOCK_FREE_QUEUE_IMPL_H__ -#define __LOCK_FREE_QUEUE_IMPL_H__ - -#include <assert.h> // assert() -#include "mm.h" -#include "sem_util.h" - -template < - typename ELEM_T, - template <typename T> class Q_TYPE> -LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize): - m_qImpl(qsize) -{ -} - -template < - typename ELEM_T, - template <typename T> class Q_TYPE> -LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue() -{ -} - -template < - typename ELEM_T, - template <typename T> class Q_TYPE> -inline uint32_t LockFreeQueue<ELEM_T, Q_TYPE>::size() -{ - return m_qImpl.size(); -} - -template < - typename ELEM_T, - template <typename T> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Q_TYPE>::full() -{ - return m_qImpl.full(); -} - -template < - typename ELEM_T, - template <typename T> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Q_TYPE>::empty() -{ - return m_qImpl.empty(); -} - -template < - typename ELEM_T, - template <typename T> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Q_TYPE>::push(const ELEM_T &a_data) -{ - return m_qImpl.push(a_data); -} - -template < - typename ELEM_T, - template <typename T> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data) -{ - return m_qImpl.pop(a_data); -} - -#endif // __LOCK_FREE_QUEUE_IMPL_H__ diff --git a/squeue/include/queue_factory.h b/squeue/include/queue_factory.h index b898a86..23446b0 100644 --- a/squeue/include/queue_factory.h +++ b/squeue/include/queue_factory.h @@ -4,10 +4,11 @@ #include "mm.h" #include "hashtable.h" #include "lock_free_queue.h" -#include "SLinkedLockFreeQueue.h" -namespace QueueFactory{ - hashtable_t * getHashTable() { +class QueueFactory{ +private: + + static hashtable_t * getHashTable() { static hashtable_t *hashtable = NULL; int first; @@ -20,24 +21,9 @@ } - template <typename T> - SLinkedLockFreeQueue<T>* createLinkedLockFreeQueue(int key, size_t size) { - + - SLinkedLockFreeQueue<T> *queue; - hashtable_t *hashtable = getHashTable(); - - - if ((queue = (SLinkedLockFreeQueue<T> *)hashtable_get(hashtable, key)) == NULL ) { - queue = new SLinkedLockFreeQueue<T>(size); - hashtable_put(hashtable, key, (void *)queue); - } - - return queue; - } - - - template <typename T> + template <typename T> static LockFreeQueue<T>* createArrayLockFreeQueue(int key, size_t size=16) { LockFreeQueue<T> *queue; @@ -51,8 +37,9 @@ return queue; } +public: - template <typename T> + template <typename T> static LockFreeQueue<T>* createQueue(int key, size_t size = 16) { return QueueFactory::createArrayLockFreeQueue<T>(key, size); } @@ -60,14 +47,13 @@ /** * destroy queue */ - template <typename T> + template <typename T> static void dropQueue(int key) { - LockFreeQueue<T> *queue = QueueFactory::createQueue<T> (key); delete queue; hashtable_t *hashtable = getHashTable(); hashtable_remove(hashtable, key); } -} +}; #endif diff --git a/squeue/mm.c b/squeue/mm.c index 7b39391..9cf01f0 100644 --- a/squeue/mm.c +++ b/squeue/mm.c @@ -1,5 +1,5 @@ /* - * 绠$悊鍏变韩鍐呭瓨鐨勫垎閰嶏紝涓庨噴鏀� + * 绠$悊鍏变韩鍐呭瓨鐨勫垎閰嶄笌閲婃斁 */ #include "mm.h" #include "sem_util.h" diff --git a/test/test.h b/test/test.h index c80ad70..084a7f1 100644 --- a/test/test.h +++ b/test/test.h @@ -1,9 +1,8 @@ #include "usg_common.h" #include "usg_typedef.h" #include "lock_free_queue.h" -#include "SLinkedLockFreeQueue.h" #include "queue_factory.h" - #include <pthread.h> +#include <pthread.h> #define NTHREADS 3 diff --git a/test/test_queue b/test/test_queue index 51f9055..5f6ee1b 100755 --- a/test/test_queue +++ b/test/test_queue Binary files differ -- Gitblit v1.8.0