squeue/include/SAbstractQueue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
squeue/include/SLinkedLockFreeQueue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
squeue/include/array_lock_free_queue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
squeue/include/linked_lock_free_queue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
squeue/include/lock_free_queue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
squeue/include/lock_free_queue_impl.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
squeue/include/queue_factory.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
squeue/mm.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test/test.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test/test_queue | 补丁 | 查看 | 原始文档 | blame | 历史 |
squeue/include/SAbstractQueue.h
File was deleted squeue/include/SLinkedLockFreeQueue.h
File was deleted squeue/include/array_lock_free_queue.h
File was renamed from squeue/include/lock_free_queue_impl_multiple_producer.h @@ -4,11 +4,82 @@ #include <assert.h> // assert() #include <sched.h> // sched_yield() /// @brief implementation of an array based lock free queue with support for /// multiple producers /// This class is prevented from being instantiated directly (all members and /// methods are private). To instantiate a multiple producers lock free queue /// you must use the ArrayLockFreeQueue fachade: /// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; template <typename ELEM_T> int ArrayLockFreeQueueMultipleProducers<ELEM_T>::m_reference = 0; class ArrayLockFreeQueue { // ArrayLockFreeQueue will be using this' private members template < typename ELEM_T_, template <typename T> class Q_TYPE > friend class LockFreeQueue; private: /// @brief constructor of the class ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); virtual ~ArrayLockFreeQueue(); inline uint32_t size(); inline bool full(); inline bool empty(); bool push(const ELEM_T &a_data); bool pop(ELEM_T &a_data); /// @brief calculate the index in the circular array that corresponds /// to a particular "count" value inline uint32_t countToIndex(uint32_t a_count); private: size_t Q_SIZE; /// @brief array to keep the elements ELEM_T *m_theQueue; /// @brief where a new element will be inserted std::atomic<uint32_t> m_writeIndex; /// @brief where the next element where be extracted from std::atomic<uint32_t> m_readIndex; /// @brief maximum read index for multiple producer queues /// If it's not the same as m_writeIndex it means /// there are writes pending to be "committed" to the queue, that means, /// the place for the data was reserved (the index in the array) but /// data is still not in the queue, so the thread trying to read will have /// to wait for those other threads to save the data into the queue /// /// note this is only used for multiple producers std::atomic<uint32_t> m_maximumReadIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE /// @brief number of elements in the queue std::atomic<uint32_t> m_count; #endif static int m_reference; private: /// @brief disable copy constructor declaring it private ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src); }; template <typename ELEM_T> ArrayLockFreeQueueMultipleProducers<ELEM_T>::ArrayLockFreeQueueMultipleProducers(size_t qsize): int ArrayLockFreeQueue<ELEM_T>::m_reference = 0; template <typename ELEM_T> ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize): Q_SIZE(qsize), m_writeIndex(0), // initialisation is not atomic m_readIndex(0), // @@ -23,9 +94,9 @@ } template <typename ELEM_T> ArrayLockFreeQueueMultipleProducers<ELEM_T>::~ArrayLockFreeQueueMultipleProducers() ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue() { std::cout << "destroy ArrayLockFreeQueueMultipleProducers\n"; std::cout << "destroy ArrayLockFreeQueue\n"; m_reference--; if(m_reference == 0) { mm_free(m_theQueue); @@ -35,7 +106,7 @@ template <typename ELEM_T> inline uint32_t ArrayLockFreeQueueMultipleProducers<ELEM_T>::countToIndex(uint32_t a_count) uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count) { // if Q_SIZE is a power of 2 this statement could be also written as // return (a_count & (Q_SIZE - 1)); @@ -44,7 +115,7 @@ template <typename ELEM_T> inline uint32_t ArrayLockFreeQueueMultipleProducers<ELEM_T>::size() uint32_t ArrayLockFreeQueue<ELEM_T>::size() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE @@ -79,7 +150,7 @@ template <typename ELEM_T> inline bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::full() bool ArrayLockFreeQueue<ELEM_T>::full() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE @@ -104,7 +175,7 @@ template <typename ELEM_T> inline bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::empty() bool ArrayLockFreeQueue<ELEM_T>::empty() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE @@ -126,7 +197,7 @@ template <typename ELEM_T> bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::push(const ELEM_T &a_data) bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data) { uint32_t currentWriteIndex; @@ -187,7 +258,7 @@ } template <typename ELEM_T> bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::pop(ELEM_T &a_data) bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data) { uint32_t currentReadIndex; squeue/include/linked_lock_free_queue.h
New file @@ -0,0 +1,247 @@ // queue.h -- interface for a queue #ifndef __LINKED_LOCK_FREE_QUEUE_H_ #define __LINKED_LOCK_FREE_QUEUE_H_ #include "mm.h" #include "sem_util.h" template <typename T> class Node; template <typename T> class Pointer { public: Node<T> *ptr; unsigned long count; Pointer( Node<T> *node = NULL, int c=0) noexcept : ptr(node), count(c) {} bool operator == (const Pointer<T> o) const { return (o.ptr == ptr) && (o.count == count); } bool operator != (const Pointer<T> o) const { return !((o.ptr == ptr) && (o.count == count)); } }; template <typename T> class Node { public: alignas(16) std::atomic<Pointer<T> > next; T value; Node() { } void *operator new(size_t size){ return mm_malloc(size); } void operator delete(void *p) { return mm_free(p); } }; template <typename T> class LinkedLockFreeQueue { private: // class scope definitions enum {Q_SIZE = 10}; // private class members std::atomic<Pointer<T> > Head; // pointer to front of Queue std::atomic<Pointer<T> > Tail; // pointer to rear of Queue //std::atomic_uint count; // current number of size in Queue std::atomic_uint count; const size_t qsize; // maximum number of size in Queue // preemptive definitions to prevent public copying LinkedLockFreeQueue(const LinkedLockFreeQueue & q) : qsize(0) { } LinkedLockFreeQueue & operator=(const LinkedLockFreeQueue & q) { return *this;} public: LinkedLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit ~LinkedLockFreeQueue(); bool empty() const; bool full() const; unsigned int size() const; bool push(const T &item); // add item to end bool pop(T &item); T& operator[](unsigned i); }; // Queue methods template <typename T> LinkedLockFreeQueue<T>::LinkedLockFreeQueue(size_t qs) : count(0), qsize(qs) { Node<T> *node = new Node<T>; Pointer<T> pointer(node, 0); Head.store(pointer, std::memory_order_relaxed); Tail.store(pointer, std::memory_order_relaxed); } template <typename T> LinkedLockFreeQueue<T>::~LinkedLockFreeQueue() { std::cerr << "LinkedLockFreeQueue destory" << std::endl; Node<T> * nodeptr; Pointer<T> tmp = Head.load(std::memory_order_relaxed); while((nodeptr = tmp.ptr) != NULL) { tmp = (tmp.ptr->next).load(std::memory_order_relaxed); //std::cerr << "delete " << nodeptr << std::endl; delete nodeptr; } } template <typename T> bool LinkedLockFreeQueue<T>::empty() const { return count == 0; } template <typename T> bool LinkedLockFreeQueue<T>::full() const { return count == qsize; } template <typename T> unsigned int LinkedLockFreeQueue<T>::size() const { return count; } // Add item to queue template <typename T> bool LinkedLockFreeQueue<T>::push(const T & item) { if (full()) return false; Node<T> * node = new Node<T>; node->value = item; Pointer<T> tail ; Pointer<T> next ; while(true) { tail = Tail.load(std::memory_order_relaxed); next = (tail.ptr->next).load(std::memory_order_relaxed); if (tail == Tail.load(std::memory_order_relaxed)) { if (next.ptr == NULL) { if ((tail.ptr->next).compare_exchange_weak(next, Pointer<T>(node, next.count+1), std::memory_order_release, std::memory_order_relaxed) ) break; else Tail.compare_exchange_weak(tail, Pointer<T>(next.ptr, tail.count+1), std::memory_order_release, std::memory_order_relaxed); } } } Tail.compare_exchange_weak(tail, Pointer<T>(node, tail.count+1), std::memory_order_release, std::memory_order_relaxed); count++; return true; } // Place front item into item variable and remove from queue template <typename T> bool LinkedLockFreeQueue<T>::pop(T & item) { if (empty()) return false; Pointer<T> head; Pointer<T> tail; Pointer<T> next; while(true) { head = Head.load(std::memory_order_relaxed); tail = Tail.load(std::memory_order_relaxed); next = (head.ptr->next).load(); if (head == Head.load(std::memory_order_relaxed)) { if(head.ptr == tail.ptr) { if (next.ptr == NULL) return false; // Tail is falling behind. Try to advance it Tail.compare_exchange_weak(tail, Pointer<T>(next.ptr, tail.count+1), std::memory_order_release, std::memory_order_relaxed); } else { item = next.ptr->value; if (Head.compare_exchange_weak(head, Pointer<T>(next.ptr, head.count+1), std::memory_order_release, std::memory_order_relaxed)) { delete head.ptr; break; } } } } count--; return true; } template <class T> T& LinkedLockFreeQueue<T>::operator[](unsigned int i) { if (i < 0 || i >= count) { std::cerr << "Error in array limits: " << i << " is out of range\n"; std::exit(EXIT_FAILURE); } Pointer<T> tmp = Head.load(std::memory_order_relaxed); //Pointer<T> tail = Tail.load(std::memory_order_relaxed); while(i > 0) { //std::cout << i << ":" << std::endl; tmp = (tmp.ptr->next).load(std::memory_order_relaxed); i--; } tmp = (tmp.ptr->next).load(std::memory_order_relaxed); return tmp.ptr->value; } #endif squeue/include/lock_free_queue.h
@@ -4,9 +4,12 @@ #include <stdint.h> // uint32_t #include <atomic> #include <usg_common.h> #include <assert.h> // assert() #include "mm.h" #include "sem_util.h" // default Queue size #define LOCK_FREE_Q_DEFAULT_SIZE 65536 // (2^16) #define LOCK_FREE_Q_DEFAULT_SIZE 16 // define this macro if calls to "size" must return the real size of the // queue. If it is undefined that function will try to take a snapshot of @@ -17,7 +20,7 @@ // template <typename ELEM_T> class ArrayLockFreeQueueMultipleProducers; class ArrayLockFreeQueue; /// @brief Lock-free queue based on a circular array @@ -32,7 +35,7 @@ /// ArrayLockFreeQueue<int, 16> q; /// // queue of ints of size (16 - 1) and /// // defaulted to single producer /// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q; /// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; /// // queue of ints of size (100 - 1) with support /// // for multiple producers /// @@ -57,13 +60,16 @@ /// last 4 elements of the queue are not used when the counter rolls /// over to 0 /// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and /// ArrayLockFreeQueueMultipleProducers are supported (single producer /// ArrayLockFreeQueue are supported (single producer /// by default) template < typename ELEM_T, template <typename T> class Q_TYPE = ArrayLockFreeQueueMultipleProducers > template <typename T> class Q_TYPE = ArrayLockFreeQueue > class LockFreeQueue { private: int slots; int items; public: /// @brief constructor of the class LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); @@ -88,7 +94,7 @@ /// environments this function might return bogus values. See help in method /// LockFreeQueue::size inline bool full(); inline bool empty(); /// @brief push an element at the tail of the queue @@ -97,13 +103,21 @@ /// structures to be inserted in the queue you should think of instantiate the template /// of the queue as a pointer to that large structure /// @return true if the element was inserted in the queue. False if the queue was full inline bool push(const ELEM_T &a_data); bool push(const ELEM_T &a_data); bool push_nowait(const ELEM_T &a_data); bool push_timeout(const ELEM_T &a_data, struct timespec * timeout); /// @brief pop the element at the head of the queue /// @param a reference where the element in the head of the queue will be saved to /// Note that the a_data parameter might contain rubbish if the function returns false /// @return true if the element was successfully extracted from the queue. False if the queue was empty inline bool pop(ELEM_T &a_data); bool pop(ELEM_T &a_data); bool pop_nowait(ELEM_T &a_data); bool pop_timeout(ELEM_T &a_data, struct timespec * timeout); void *operator new(size_t size); void operator delete(void *p); protected: /// @brief the actual queue. methods are forwarded into the real @@ -117,77 +131,186 @@ /// @brief implementation of an array based lock free queue with support for /// multiple producers /// This class is prevented from being instantiated directly (all members and /// methods are private). To instantiate a multiple producers lock free queue /// you must use the ArrayLockFreeQueue fachade: /// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q; template <typename ELEM_T> class ArrayLockFreeQueueMultipleProducers template < typename ELEM_T, template <typename T> class Q_TYPE> LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) { // ArrayLockFreeQueue will be using this' private members template < typename ELEM_T_, template <typename T> class Q_TYPE > friend class LockFreeQueue; slots = SemUtil::get(IPC_PRIVATE, qsize); items = SemUtil::get(IPC_PRIVATE, 0); } private: /// @brief constructor of the class ArrayLockFreeQueueMultipleProducers(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); virtual ~ArrayLockFreeQueueMultipleProducers(); inline uint32_t size(); inline bool full(); template < typename ELEM_T, template <typename T> class Q_TYPE> LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue() { SemUtil::remove(slots); SemUtil::remove(items); } inline bool empty(); bool push(const ELEM_T &a_data); bool pop(ELEM_T &a_data); /// @brief calculate the index in the circular array that corresponds /// to a particular "count" value inline uint32_t countToIndex(uint32_t a_count); private: size_t Q_SIZE; /// @brief array to keep the elements ELEM_T *m_theQueue; template < typename ELEM_T, template <typename T> class Q_TYPE> inline uint32_t LockFreeQueue<ELEM_T, Q_TYPE>::size() { return m_qImpl.size(); } /// @brief where a new element will be inserted std::atomic<uint32_t> m_writeIndex; template < typename ELEM_T, template <typename T> class Q_TYPE> inline bool LockFreeQueue<ELEM_T, Q_TYPE>::full() { return m_qImpl.full(); } /// @brief where the next element where be extracted from std::atomic<uint32_t> m_readIndex; /// @brief maximum read index for multiple producer queues /// If it's not the same as m_writeIndex it means /// there are writes pending to be "committed" to the queue, that means, /// the place for the data was reserved (the index in the array) but /// data is still not in the queue, so the thread trying to read will have /// to wait for those other threads to save the data into the queue /// /// note this is only used for multiple producers std::atomic<uint32_t> m_maximumReadIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE /// @brief number of elements in the queue std::atomic<uint32_t> m_count; #endif static int m_reference; private: /// @brief disable copy constructor declaring it private ArrayLockFreeQueueMultipleProducers<ELEM_T>(const ArrayLockFreeQueueMultipleProducers<ELEM_T> &a_src); template < typename ELEM_T, template <typename T> class Q_TYPE> inline bool LockFreeQueue<ELEM_T, Q_TYPE>::empty() { return m_qImpl.empty(); } }; template < typename ELEM_T, template <typename T> class Q_TYPE> bool LockFreeQueue<ELEM_T, Q_TYPE>::push(const ELEM_T &a_data) { if (SemUtil::dec(slots) == -1) { err_exit(errno, "push"); } if ( m_qImpl.push(a_data) ) { SemUtil::inc(items); return true; } return false; } template < typename ELEM_T, template <typename T> class Q_TYPE> bool LockFreeQueue<ELEM_T, Q_TYPE>::push_nowait(const ELEM_T &a_data) { if (SemUtil::dec_nowait(slots) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "push_nowait"); } if ( m_qImpl.push(a_data)) { SemUtil::inc(items); return true; } return false; } template < typename ELEM_T, template <typename T> class Q_TYPE> bool LockFreeQueue<ELEM_T, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout) { if (SemUtil::dec_timeout(slots, timeout) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "push_timeout"); } if (m_qImpl.push(a_data)){ SemUtil::inc(items); return true; } return false; } template < typename ELEM_T, template <typename T> class Q_TYPE> bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data) { if (SemUtil::dec(items) == -1) { err_exit(errno, "remove"); } if (m_qImpl.pop(a_data)) { SemUtil::inc(slots); return true; } return false; } template < typename ELEM_T, template <typename T> class Q_TYPE> bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_nowait(ELEM_T &a_data) { if (SemUtil::dec_nowait(items) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "remove_nowait"); } if (m_qImpl.pop(a_data)) { SemUtil::inc(slots); return true; } return false; } template < typename ELEM_T, template <typename T> class Q_TYPE> bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) { if (SemUtil::dec_timeout(items, timeout) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "remove_timeout"); } if (m_qImpl.pop(a_data)) { SemUtil::inc(slots); return true; } return false; } template < typename ELEM_T, template <typename T> class Q_TYPE> void * LockFreeQueue<ELEM_T, Q_TYPE>::operator new(size_t size){ return mm_malloc(size); } template < typename ELEM_T, template <typename T> class Q_TYPE> void LockFreeQueue<ELEM_T, Q_TYPE>::operator delete(void *p) { return mm_free(p); } // include implementation files #include "lock_free_queue_impl.h" #include "lock_free_queue_impl_multiple_producer.h" #include "linked_lock_free_queue.h" #include "array_lock_free_queue.h" #endif // _LOCK_FREE_QUEUE_H__ squeue/include/lock_free_queue_impl.h
File was deleted squeue/include/queue_factory.h
@@ -4,10 +4,11 @@ #include "mm.h" #include "hashtable.h" #include "lock_free_queue.h" #include "SLinkedLockFreeQueue.h" namespace QueueFactory{ hashtable_t * getHashTable() { class QueueFactory{ private: static hashtable_t * getHashTable() { static hashtable_t *hashtable = NULL; int first; @@ -20,24 +21,9 @@ } template <typename T> SLinkedLockFreeQueue<T>* createLinkedLockFreeQueue(int key, size_t size) { SLinkedLockFreeQueue<T> *queue; hashtable_t *hashtable = getHashTable(); if ((queue = (SLinkedLockFreeQueue<T> *)hashtable_get(hashtable, key)) == NULL ) { queue = new SLinkedLockFreeQueue<T>(size); hashtable_put(hashtable, key, (void *)queue); } return queue; } template <typename T> template <typename T> static LockFreeQueue<T>* createArrayLockFreeQueue(int key, size_t size=16) { LockFreeQueue<T> *queue; @@ -51,8 +37,9 @@ return queue; } public: template <typename T> template <typename T> static LockFreeQueue<T>* createQueue(int key, size_t size = 16) { return QueueFactory::createArrayLockFreeQueue<T>(key, size); } @@ -60,14 +47,13 @@ /** * destroy queue */ template <typename T> template <typename T> static void dropQueue(int key) { LockFreeQueue<T> *queue = QueueFactory::createQueue<T> (key); delete queue; hashtable_t *hashtable = getHashTable(); hashtable_remove(hashtable, key); } } }; #endif squeue/mm.c
@@ -1,5 +1,5 @@ /* * 管理共享内存的分配,与释放 * 管理共享内存的分配与释放 */ #include "mm.h" #include "sem_util.h" test/test.h
@@ -1,9 +1,8 @@ #include "usg_common.h" #include "usg_typedef.h" #include "lock_free_queue.h" #include "SLinkedLockFreeQueue.h" #include "queue_factory.h" #include <pthread.h> #include <pthread.h> #define NTHREADS 3 test/test_queueBinary files differ