From 082633f08aae8eea19bd7050cbe4a75e5ed1ac6f Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 07 七月 2020 12:07:29 +0800
Subject: [PATCH] update
---
squeue/include/queue_factory.h | 34 --
/dev/null | 63 -----
test/test_queue | 0
squeue/include/array_lock_free_queue.h | 91 ++++++
squeue/include/lock_free_queue.h | 265 ++++++++++++++++-----
test/test.h | 3
squeue/mm.c | 2
squeue/include/linked_lock_free_queue.h | 247 ++++++++++++++++++++
8 files changed, 534 insertions(+), 171 deletions(-)
diff --git a/squeue/include/SAbstractQueue.h b/squeue/include/SAbstractQueue.h
deleted file mode 100644
index a72439f..0000000
--- a/squeue/include/SAbstractQueue.h
+++ /dev/null
@@ -1,22 +0,0 @@
-// queue.h -- interface for a queue
-#ifndef SAbstractQueue_H_
-#define SAbstractQueue_H_
-
-template <typename T>
-class SAbstractQueue
-{
- public:
- SAbstractQueue() {}
- virtual unsigned int size() const = 0;
- virtual bool add(const T &item) = 0;// add item to end
- virtual bool add_nowait(const T &item) = 0;
- virtual bool add_timeout(const T &item, struct timespec *timeout) = 0;
- virtual bool remove(T &item) = 0;
- virtual bool remove_nowait(T &item) = 0;
- virtual bool remove_timeout(T &item, struct timespec * timeout) = 0;
-
-
- virtual T& operator[](unsigned i) = 0;
- virtual ~SAbstractQueue() {}
-};
-#endif
diff --git a/squeue/include/SLinkedLockFreeQueue.h b/squeue/include/SLinkedLockFreeQueue.h
deleted file mode 100644
index 8225920..0000000
--- a/squeue/include/SLinkedLockFreeQueue.h
+++ /dev/null
@@ -1,370 +0,0 @@
-// queue.h -- interface for a queue
-#ifndef SLinkedLockFreeQueue_H_
-#define SLinkedLockFreeQueue_H_
-#include "mm.h"
-#include "sem_util.h"
-#include "SAbstractQueue.h"
-
-
-template <typename T> class Node;
-
-template <typename T>
-class Pointer {
-public:
-
- Node<T> *ptr;
- unsigned long count;
- Pointer( Node<T> *node = NULL, int c=0) noexcept : ptr(node), count(c) {}
-
- bool operator == (const Pointer<T> o) const {
- return (o.ptr == ptr) && (o.count == count);
- }
- bool operator != (const Pointer<T> o) const {
- return !((o.ptr == ptr) && (o.count == count));
- }
-
-
-
-};
-
-template <typename T>
-class Node {
-public:
- alignas(16) std::atomic<Pointer<T> > next;
- T value;
-
- Node() {
- }
-
- void *operator new(size_t size){
- return mm_malloc(size);
- }
-
- void operator delete(void *p) {
- return mm_free(p);
- }
-};
-
-
-
-
-
-template <typename T>
-class SLinkedLockFreeQueue
-{
-private:
-// class scope definitions
- enum {Q_SIZE = 10};
-
- int slots;
- int items;
-// private class members
- std::atomic<Pointer<T> > Head; // pointer to front of Queue
- std::atomic<Pointer<T> > Tail; // pointer to rear of Queue
- //std::atomic_uint count; // current number of size in Queue
- std::atomic_uint count;
- const size_t qsize; // maximum number of size in Queue
- // preemptive definitions to prevent public copying
- SLinkedLockFreeQueue(const SLinkedLockFreeQueue & q) : qsize(0) { }
- SLinkedLockFreeQueue & operator=(const SLinkedLockFreeQueue & q) { return *this;}
- bool _add(const T &item); // add item to end
- bool _remove(T &item); // remove item from front
-public:
- SLinkedLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit
- ~SLinkedLockFreeQueue();
- bool isempty() const;
- bool isfull() const;
- unsigned int size() const;
- bool add(const T &item); // add item to end
- bool add_nowait(const T &item);
- bool add_timeout(const T &item, struct timespec *timeout);
- bool remove(T &item);
- bool remove_nowait(T &item);
- bool remove_timeout(T &item, struct timespec * timeout);
-
-
- T& operator[](unsigned i);
-
- void *operator new(size_t size);
- void operator delete(void *p);
-};
-
-
-
-
-// Queue methods
-template <typename T>
-SLinkedLockFreeQueue<T>::SLinkedLockFreeQueue(size_t qs) : count(0), qsize(qs)
-{
- Node<T> *node = new Node<T>;
- Pointer<T> pointer(node, 0);
-
- Head.store(pointer, std::memory_order_relaxed);
- Tail.store(pointer, std::memory_order_relaxed);
-
- slots = SemUtil::get(IPC_PRIVATE, qsize);
- items = SemUtil::get(IPC_PRIVATE, 0);
-
-}
-
-template <typename T>
-SLinkedLockFreeQueue<T>::~SLinkedLockFreeQueue()
-{
- std::cerr << "SLinkedLockFreeQueue destory" << std::endl;
- SemUtil::remove(slots);
- SemUtil::remove(items);
-
-
- Node<T> * nodeptr;
- Pointer<T> tmp = Head.load(std::memory_order_relaxed);
- while((nodeptr = tmp.ptr) != NULL) {
- tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
- //std::cerr << "delete " << nodeptr << std::endl;
- delete nodeptr;
-
- }
-
-}
-
-template <typename T>
-bool SLinkedLockFreeQueue<T>::isempty() const
-{
- return count == 0;
-}
-
-template <typename T>
-bool SLinkedLockFreeQueue<T>::isfull() const
-{
- return count == qsize;
-}
-
-template <typename T>
-unsigned int SLinkedLockFreeQueue<T>::size() const
-{
- return count;
-}
-
-// Add item to queue
-template <typename T>
-bool SLinkedLockFreeQueue<T>::_add(const T & item)
-{
- if (isfull())
- return false;
-
- Node<T> * node = new Node<T>;
- node->value = item;
-
-
- Pointer<T> tail ;
- Pointer<T> next ;
-
-
- while(true) {
- tail = Tail.load(std::memory_order_relaxed);
- next = (tail.ptr->next).load(std::memory_order_relaxed);
- if (tail == Tail.load(std::memory_order_relaxed)) {
- if (next.ptr == NULL) {
- if ((tail.ptr->next).compare_exchange_weak(next,
- Pointer<T>(node, next.count+1),
- std::memory_order_release,
- std::memory_order_relaxed) )
- break;
- else
- Tail.compare_exchange_weak(tail,
- Pointer<T>(next.ptr, tail.count+1),
- std::memory_order_release,
- std::memory_order_relaxed);
- }
-
- }
- }
-
- Tail.compare_exchange_weak(tail, Pointer<T>(node, tail.count+1),
- std::memory_order_release,
- std::memory_order_relaxed);
- count++;
- return true;
-}
-
-template <typename T>
-bool SLinkedLockFreeQueue<T>::add(const T & item)
-{
- if (SemUtil::dec(slots) == -1) {
- err_exit(errno, "add");
- }
-
- if (SLinkedLockFreeQueue<T>::_add(item)) {
- SemUtil::inc(items);
- return true;
- }
- return false;
-
-}
-
-template <typename T>
-bool SLinkedLockFreeQueue<T>::add_nowait(const T & item)
-{
- if (SemUtil::dec_nowait(slots) == -1) {
- if (errno == EAGAIN)
- return false;
- else
- err_exit(errno, "add_nowait");
- }
-
- if (SLinkedLockFreeQueue<T>::_add(item)) {
- SemUtil::inc(items);
- return true;
- }
- return false;
-
-}
-
-template <typename T>
-bool SLinkedLockFreeQueue<T>::add_timeout(const T & item, struct timespec * timeout)
-{
- if (SemUtil::dec_timeout(slots, timeout) == -1) {
- if (errno == EAGAIN)
- return false;
- else
- err_exit(errno, "add_timeout");
- }
-
- if (SLinkedLockFreeQueue<T>::_add(item)){
- SemUtil::inc(items);
- return true;
- }
- return false;
-
-}
-
-
-// Place front item into item variable and remove from queue
-template <typename T>
-bool SLinkedLockFreeQueue<T>::_remove(T & item)
-{
- if (isempty())
- return false;
-
- Pointer<T> head;
- Pointer<T> tail;
- Pointer<T> next;
-
- while(true) {
- head = Head.load(std::memory_order_relaxed);
- tail = Tail.load(std::memory_order_relaxed);
- next = (head.ptr->next).load();
- if (head == Head.load(std::memory_order_relaxed)) {
- if(head.ptr == tail.ptr) {
- if (next.ptr == NULL)
- return false;
- // Tail is falling behind. Try to advance it
- Tail.compare_exchange_weak(tail,
- Pointer<T>(next.ptr, tail.count+1),
- std::memory_order_release,
- std::memory_order_relaxed);
- } else {
- item = next.ptr->value;
- if (Head.compare_exchange_weak(head,
- Pointer<T>(next.ptr, head.count+1),
- std::memory_order_release,
- std::memory_order_relaxed)) {
- delete head.ptr;
- break;
- }
-
- }
- }
-
- }
-
- count--;
- return true;
-
-}
-
-template <typename T>
-bool SLinkedLockFreeQueue<T>::remove(T & item)
-{
- if (SemUtil::dec(items) == -1) {
- err_exit(errno, "remove");
- }
-
- if (SLinkedLockFreeQueue<T>::_remove(item)) {
- SemUtil::inc(slots);
- return true;
- }
- return false;
-
-}
-
-template <typename T>
-bool SLinkedLockFreeQueue<T>::remove_nowait(T & item)
-{
- if (SemUtil::dec_nowait(items) == -1) {
- if (errno == EAGAIN)
- return false;
- else
- err_exit(errno, "remove_nowait");
- }
-
- if (SLinkedLockFreeQueue<T>::_remove(item)) {
- SemUtil::inc(slots);
- return true;
- }
- return false;
-
-}
-
-template <typename T>
-bool SLinkedLockFreeQueue<T>::remove_timeout(T & item, struct timespec * timeout)
-{
- if (SemUtil::dec_timeout(items, timeout) == -1) {
- if (errno == EAGAIN)
- return false;
- else
- err_exit(errno, "remove_timeout");
- }
-
- if (SLinkedLockFreeQueue<T>::_remove(item)) {
- SemUtil::inc(slots);
- return true;
- }
- return false;
-
-}
-
-
-template <class T>
-T& SLinkedLockFreeQueue<T>::operator[](unsigned int i)
-{
- if (i < 0 || i >= count)
- {
- std::cerr << "Error in array limits: " << i << " is out of range\n";
- std::exit(EXIT_FAILURE);
- }
-
-
- Pointer<T> tmp = Head.load(std::memory_order_relaxed);
- //Pointer<T> tail = Tail.load(std::memory_order_relaxed);
-
- while(i > 0) {
- //std::cout << i << ":" << std::endl;
- tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
- i--;
- }
-
- tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
- return tmp.ptr->value;
-}
-
-template <class T>
-void * SLinkedLockFreeQueue<T>::operator new(size_t size){
- return mm_malloc(size);
-}
-
-template <class T>
-void SLinkedLockFreeQueue<T>::operator delete(void *p) {
- return mm_free(p);
-}
-
-#endif
diff --git a/squeue/include/lock_free_queue_impl_multiple_producer.h b/squeue/include/array_lock_free_queue.h
similarity index 71%
rename from squeue/include/lock_free_queue_impl_multiple_producer.h
rename to squeue/include/array_lock_free_queue.h
index 959cca8..8bcf579 100644
--- a/squeue/include/lock_free_queue_impl_multiple_producer.h
+++ b/squeue/include/array_lock_free_queue.h
@@ -4,11 +4,82 @@
#include <assert.h> // assert()
#include <sched.h> // sched_yield()
+
+/// @brief implementation of an array based lock free queue with support for
+/// multiple producers
+/// This class is prevented from being instantiated directly (all members and
+/// methods are private). To instantiate a multiple producers lock free queue
+/// you must use the ArrayLockFreeQueue fachade:
+/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
template <typename ELEM_T>
-int ArrayLockFreeQueueMultipleProducers<ELEM_T>::m_reference = 0;
+class ArrayLockFreeQueue
+{
+ // ArrayLockFreeQueue will be using this' private members
+ template <
+ typename ELEM_T_,
+ template <typename T> class Q_TYPE >
+ friend class LockFreeQueue;
+
+private:
+ /// @brief constructor of the class
+ ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
+
+ virtual ~ArrayLockFreeQueue();
+
+ inline uint32_t size();
+
+ inline bool full();
+
+ inline bool empty();
+
+ bool push(const ELEM_T &a_data);
+
+ bool pop(ELEM_T &a_data);
+
+ /// @brief calculate the index in the circular array that corresponds
+ /// to a particular "count" value
+ inline uint32_t countToIndex(uint32_t a_count);
+
+private:
+ size_t Q_SIZE;
+ /// @brief array to keep the elements
+ ELEM_T *m_theQueue;
+
+ /// @brief where a new element will be inserted
+ std::atomic<uint32_t> m_writeIndex;
+
+ /// @brief where the next element where be extracted from
+ std::atomic<uint32_t> m_readIndex;
+
+ /// @brief maximum read index for multiple producer queues
+ /// If it's not the same as m_writeIndex it means
+ /// there are writes pending to be "committed" to the queue, that means,
+ /// the place for the data was reserved (the index in the array) but
+ /// data is still not in the queue, so the thread trying to read will have
+ /// to wait for those other threads to save the data into the queue
+ ///
+ /// note this is only used for multiple producers
+ std::atomic<uint32_t> m_maximumReadIndex;
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ /// @brief number of elements in the queue
+ std::atomic<uint32_t> m_count;
+#endif
+ static int m_reference;
+
+private:
+ /// @brief disable copy constructor declaring it private
+ ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src);
+
+
+};
+
template <typename ELEM_T>
-ArrayLockFreeQueueMultipleProducers<ELEM_T>::ArrayLockFreeQueueMultipleProducers(size_t qsize):
+int ArrayLockFreeQueue<ELEM_T>::m_reference = 0;
+
+template <typename ELEM_T>
+ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize):
Q_SIZE(qsize),
m_writeIndex(0), // initialisation is not atomic
m_readIndex(0), //
@@ -23,9 +94,9 @@
}
template <typename ELEM_T>
-ArrayLockFreeQueueMultipleProducers<ELEM_T>::~ArrayLockFreeQueueMultipleProducers()
+ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue()
{
- std::cout << "destroy ArrayLockFreeQueueMultipleProducers\n";
+ std::cout << "destroy ArrayLockFreeQueue\n";
m_reference--;
if(m_reference == 0) {
mm_free(m_theQueue);
@@ -35,7 +106,7 @@
template <typename ELEM_T>
inline
-uint32_t ArrayLockFreeQueueMultipleProducers<ELEM_T>::countToIndex(uint32_t a_count)
+uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count)
{
// if Q_SIZE is a power of 2 this statement could be also written as
// return (a_count & (Q_SIZE - 1));
@@ -44,7 +115,7 @@
template <typename ELEM_T>
inline
-uint32_t ArrayLockFreeQueueMultipleProducers<ELEM_T>::size()
+uint32_t ArrayLockFreeQueue<ELEM_T>::size()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -79,7 +150,7 @@
template <typename ELEM_T>
inline
-bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::full()
+bool ArrayLockFreeQueue<ELEM_T>::full()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -104,7 +175,7 @@
template <typename ELEM_T>
inline
-bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::empty()
+bool ArrayLockFreeQueue<ELEM_T>::empty()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -126,7 +197,7 @@
template <typename ELEM_T>
-bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::push(const ELEM_T &a_data)
+bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
{
uint32_t currentWriteIndex;
@@ -187,7 +258,7 @@
}
template <typename ELEM_T>
-bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::pop(ELEM_T &a_data)
+bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
{
uint32_t currentReadIndex;
diff --git a/squeue/include/linked_lock_free_queue.h b/squeue/include/linked_lock_free_queue.h
new file mode 100644
index 0000000..58cdc00
--- /dev/null
+++ b/squeue/include/linked_lock_free_queue.h
@@ -0,0 +1,247 @@
+// queue.h -- interface for a queue
+#ifndef __LINKED_LOCK_FREE_QUEUE_H_
+#define __LINKED_LOCK_FREE_QUEUE_H_
+#include "mm.h"
+#include "sem_util.h"
+
+template <typename T> class Node;
+
+template <typename T>
+class Pointer {
+public:
+
+ Node<T> *ptr;
+ unsigned long count;
+ Pointer( Node<T> *node = NULL, int c=0) noexcept : ptr(node), count(c) {}
+
+ bool operator == (const Pointer<T> o) const {
+ return (o.ptr == ptr) && (o.count == count);
+ }
+ bool operator != (const Pointer<T> o) const {
+ return !((o.ptr == ptr) && (o.count == count));
+ }
+
+
+
+};
+
+template <typename T>
+class Node {
+public:
+ alignas(16) std::atomic<Pointer<T> > next;
+ T value;
+
+ Node() {
+ }
+
+ void *operator new(size_t size){
+ return mm_malloc(size);
+ }
+
+ void operator delete(void *p) {
+ return mm_free(p);
+ }
+};
+
+
+
+
+
+template <typename T>
+class LinkedLockFreeQueue
+{
+private:
+// class scope definitions
+ enum {Q_SIZE = 10};
+
+
+// private class members
+ std::atomic<Pointer<T> > Head; // pointer to front of Queue
+ std::atomic<Pointer<T> > Tail; // pointer to rear of Queue
+ //std::atomic_uint count; // current number of size in Queue
+ std::atomic_uint count;
+ const size_t qsize; // maximum number of size in Queue
+ // preemptive definitions to prevent public copying
+ LinkedLockFreeQueue(const LinkedLockFreeQueue & q) : qsize(0) { }
+ LinkedLockFreeQueue & operator=(const LinkedLockFreeQueue & q) { return *this;}
+public:
+ LinkedLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit
+ ~LinkedLockFreeQueue();
+ bool empty() const;
+ bool full() const;
+ unsigned int size() const;
+ bool push(const T &item); // add item to end
+ bool pop(T &item);
+
+
+ T& operator[](unsigned i);
+
+};
+
+
+
+
+// Queue methods
+template <typename T>
+LinkedLockFreeQueue<T>::LinkedLockFreeQueue(size_t qs) : count(0), qsize(qs)
+{
+ Node<T> *node = new Node<T>;
+ Pointer<T> pointer(node, 0);
+
+ Head.store(pointer, std::memory_order_relaxed);
+ Tail.store(pointer, std::memory_order_relaxed);
+
+
+
+}
+
+template <typename T>
+LinkedLockFreeQueue<T>::~LinkedLockFreeQueue()
+{
+ std::cerr << "LinkedLockFreeQueue destory" << std::endl;
+
+ Node<T> * nodeptr;
+ Pointer<T> tmp = Head.load(std::memory_order_relaxed);
+ while((nodeptr = tmp.ptr) != NULL) {
+ tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
+ //std::cerr << "delete " << nodeptr << std::endl;
+ delete nodeptr;
+
+ }
+
+}
+
+template <typename T>
+bool LinkedLockFreeQueue<T>::empty() const
+{
+ return count == 0;
+}
+
+template <typename T>
+bool LinkedLockFreeQueue<T>::full() const
+{
+ return count == qsize;
+}
+
+template <typename T>
+unsigned int LinkedLockFreeQueue<T>::size() const
+{
+ return count;
+}
+
+// Add item to queue
+template <typename T>
+bool LinkedLockFreeQueue<T>::push(const T & item)
+{
+ if (full())
+ return false;
+
+ Node<T> * node = new Node<T>;
+ node->value = item;
+
+
+ Pointer<T> tail ;
+ Pointer<T> next ;
+
+
+ while(true) {
+ tail = Tail.load(std::memory_order_relaxed);
+ next = (tail.ptr->next).load(std::memory_order_relaxed);
+ if (tail == Tail.load(std::memory_order_relaxed)) {
+ if (next.ptr == NULL) {
+ if ((tail.ptr->next).compare_exchange_weak(next,
+ Pointer<T>(node, next.count+1),
+ std::memory_order_release,
+ std::memory_order_relaxed) )
+ break;
+ else
+ Tail.compare_exchange_weak(tail,
+ Pointer<T>(next.ptr, tail.count+1),
+ std::memory_order_release,
+ std::memory_order_relaxed);
+ }
+
+ }
+ }
+
+ Tail.compare_exchange_weak(tail, Pointer<T>(node, tail.count+1),
+ std::memory_order_release,
+ std::memory_order_relaxed);
+ count++;
+ return true;
+}
+
+
+
+
+// Place front item into item variable and remove from queue
+template <typename T>
+bool LinkedLockFreeQueue<T>::pop(T & item)
+{
+ if (empty())
+ return false;
+
+ Pointer<T> head;
+ Pointer<T> tail;
+ Pointer<T> next;
+
+ while(true) {
+ head = Head.load(std::memory_order_relaxed);
+ tail = Tail.load(std::memory_order_relaxed);
+ next = (head.ptr->next).load();
+ if (head == Head.load(std::memory_order_relaxed)) {
+ if(head.ptr == tail.ptr) {
+ if (next.ptr == NULL)
+ return false;
+ // Tail is falling behind. Try to advance it
+ Tail.compare_exchange_weak(tail,
+ Pointer<T>(next.ptr, tail.count+1),
+ std::memory_order_release,
+ std::memory_order_relaxed);
+ } else {
+ item = next.ptr->value;
+ if (Head.compare_exchange_weak(head,
+ Pointer<T>(next.ptr, head.count+1),
+ std::memory_order_release,
+ std::memory_order_relaxed)) {
+ delete head.ptr;
+ break;
+ }
+
+ }
+ }
+
+ }
+
+ count--;
+ return true;
+
+}
+
+
+template <class T>
+T& LinkedLockFreeQueue<T>::operator[](unsigned int i)
+{
+ if (i < 0 || i >= count)
+ {
+ std::cerr << "Error in array limits: " << i << " is out of range\n";
+ std::exit(EXIT_FAILURE);
+ }
+
+
+ Pointer<T> tmp = Head.load(std::memory_order_relaxed);
+ //Pointer<T> tail = Tail.load(std::memory_order_relaxed);
+
+ while(i > 0) {
+ //std::cout << i << ":" << std::endl;
+ tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
+ i--;
+ }
+
+ tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
+ return tmp.ptr->value;
+}
+
+
+
+#endif
diff --git a/squeue/include/lock_free_queue.h b/squeue/include/lock_free_queue.h
index a8f7801..c8a672c 100644
--- a/squeue/include/lock_free_queue.h
+++ b/squeue/include/lock_free_queue.h
@@ -4,9 +4,12 @@
#include <stdint.h> // uint32_t
#include <atomic>
#include <usg_common.h>
+#include <assert.h> // assert()
+#include "mm.h"
+#include "sem_util.h"
// default Queue size
-#define LOCK_FREE_Q_DEFAULT_SIZE 65536 // (2^16)
+#define LOCK_FREE_Q_DEFAULT_SIZE 16
// define this macro if calls to "size" must return the real size of the
// queue. If it is undefined that function will try to take a snapshot of
@@ -17,7 +20,7 @@
//
template <typename ELEM_T>
-class ArrayLockFreeQueueMultipleProducers;
+class ArrayLockFreeQueue;
/// @brief Lock-free queue based on a circular array
@@ -32,7 +35,7 @@
/// ArrayLockFreeQueue<int, 16> q;
/// // queue of ints of size (16 - 1) and
/// // defaulted to single producer
-/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q;
+/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
/// // queue of ints of size (100 - 1) with support
/// // for multiple producers
///
@@ -57,13 +60,16 @@
/// last 4 elements of the queue are not used when the counter rolls
/// over to 0
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
-/// ArrayLockFreeQueueMultipleProducers are supported (single producer
+/// ArrayLockFreeQueue are supported (single producer
/// by default)
template <
typename ELEM_T,
- template <typename T> class Q_TYPE = ArrayLockFreeQueueMultipleProducers >
+ template <typename T> class Q_TYPE = ArrayLockFreeQueue >
class LockFreeQueue
{
+private:
+ int slots;
+ int items;
public:
/// @brief constructor of the class
LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
@@ -88,7 +94,7 @@
/// environments this function might return bogus values. See help in method
/// LockFreeQueue::size
inline bool full();
-
+
inline bool empty();
/// @brief push an element at the tail of the queue
@@ -97,13 +103,21 @@
/// structures to be inserted in the queue you should think of instantiate the template
/// of the queue as a pointer to that large structure
/// @return true if the element was inserted in the queue. False if the queue was full
- inline bool push(const ELEM_T &a_data);
+ bool push(const ELEM_T &a_data);
+ bool push_nowait(const ELEM_T &a_data);
+ bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
/// @brief pop the element at the head of the queue
/// @param a reference where the element in the head of the queue will be saved to
/// Note that the a_data parameter might contain rubbish if the function returns false
/// @return true if the element was successfully extracted from the queue. False if the queue was empty
- inline bool pop(ELEM_T &a_data);
+ bool pop(ELEM_T &a_data);
+ bool pop_nowait(ELEM_T &a_data);
+ bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
+
+
+ void *operator new(size_t size);
+ void operator delete(void *p);
protected:
/// @brief the actual queue. methods are forwarded into the real
@@ -117,77 +131,186 @@
-/// @brief implementation of an array based lock free queue with support for
-/// multiple producers
-/// This class is prevented from being instantiated directly (all members and
-/// methods are private). To instantiate a multiple producers lock free queue
-/// you must use the ArrayLockFreeQueue fachade:
-/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q;
-template <typename ELEM_T>
-class ArrayLockFreeQueueMultipleProducers
+template <
+ typename ELEM_T,
+ template <typename T> class Q_TYPE>
+LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize):
+ m_qImpl(qsize)
{
- // ArrayLockFreeQueue will be using this' private members
- template <
- typename ELEM_T_,
- template <typename T> class Q_TYPE >
- friend class LockFreeQueue;
+ slots = SemUtil::get(IPC_PRIVATE, qsize);
+ items = SemUtil::get(IPC_PRIVATE, 0);
+}
-private:
- /// @brief constructor of the class
- ArrayLockFreeQueueMultipleProducers(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
-
- virtual ~ArrayLockFreeQueueMultipleProducers();
-
- inline uint32_t size();
-
- inline bool full();
+template <
+ typename ELEM_T,
+ template <typename T> class Q_TYPE>
+LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue()
+{
+ SemUtil::remove(slots);
+ SemUtil::remove(items);
+}
- inline bool empty();
-
- bool push(const ELEM_T &a_data);
-
- bool pop(ELEM_T &a_data);
-
- /// @brief calculate the index in the circular array that corresponds
- /// to a particular "count" value
- inline uint32_t countToIndex(uint32_t a_count);
-
-private:
- size_t Q_SIZE;
- /// @brief array to keep the elements
- ELEM_T *m_theQueue;
+template <
+ typename ELEM_T,
+ template <typename T> class Q_TYPE>
+inline uint32_t LockFreeQueue<ELEM_T, Q_TYPE>::size()
+{
+ return m_qImpl.size();
+}
- /// @brief where a new element will be inserted
- std::atomic<uint32_t> m_writeIndex;
+template <
+ typename ELEM_T,
+ template <typename T> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Q_TYPE>::full()
+{
+ return m_qImpl.full();
+}
- /// @brief where the next element where be extracted from
- std::atomic<uint32_t> m_readIndex;
-
- /// @brief maximum read index for multiple producer queues
- /// If it's not the same as m_writeIndex it means
- /// there are writes pending to be "committed" to the queue, that means,
- /// the place for the data was reserved (the index in the array) but
- /// data is still not in the queue, so the thread trying to read will have
- /// to wait for those other threads to save the data into the queue
- ///
- /// note this is only used for multiple producers
- std::atomic<uint32_t> m_maximumReadIndex;
-
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- /// @brief number of elements in the queue
- std::atomic<uint32_t> m_count;
-#endif
- static int m_reference;
-
-private:
- /// @brief disable copy constructor declaring it private
- ArrayLockFreeQueueMultipleProducers<ELEM_T>(const ArrayLockFreeQueueMultipleProducers<ELEM_T> &a_src);
+template <
+ typename ELEM_T,
+ template <typename T> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Q_TYPE>::empty()
+{
+ return m_qImpl.empty();
+}
-};
+template <
+ typename ELEM_T,
+ template <typename T> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Q_TYPE>::push(const ELEM_T &a_data)
+{
+ if (SemUtil::dec(slots) == -1) {
+ err_exit(errno, "push");
+ }
+
+ if ( m_qImpl.push(a_data) ) {
+ SemUtil::inc(items);
+ return true;
+ }
+ return false;
+
+}
+
+template <
+ typename ELEM_T,
+ template <typename T> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Q_TYPE>::push_nowait(const ELEM_T &a_data)
+{
+ if (SemUtil::dec_nowait(slots) == -1) {
+ if (errno == EAGAIN)
+ return false;
+ else
+ err_exit(errno, "push_nowait");
+ }
+
+ if ( m_qImpl.push(a_data)) {
+ SemUtil::inc(items);
+ return true;
+ }
+ return false;
+
+}
+
+template <
+ typename ELEM_T,
+ template <typename T> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
+{
+
+ if (SemUtil::dec_timeout(slots, timeout) == -1) {
+ if (errno == EAGAIN)
+ return false;
+ else
+ err_exit(errno, "push_timeout");
+ }
+
+ if (m_qImpl.push(a_data)){
+ SemUtil::inc(items);
+ return true;
+ }
+ return false;
+
+}
+
+
+
+
+template <
+ typename ELEM_T,
+ template <typename T> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data)
+{
+ if (SemUtil::dec(items) == -1) {
+ err_exit(errno, "remove");
+ }
+
+ if (m_qImpl.pop(a_data)) {
+ SemUtil::inc(slots);
+ return true;
+ }
+ return false;
+
+}
+
+template <
+ typename ELEM_T,
+ template <typename T> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_nowait(ELEM_T &a_data)
+{
+ if (SemUtil::dec_nowait(items) == -1) {
+ if (errno == EAGAIN)
+ return false;
+ else
+ err_exit(errno, "remove_nowait");
+ }
+
+ if (m_qImpl.pop(a_data)) {
+ SemUtil::inc(slots);
+ return true;
+ }
+ return false;
+
+}
+
+
+template <
+ typename ELEM_T,
+ template <typename T> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
+{
+ if (SemUtil::dec_timeout(items, timeout) == -1) {
+ if (errno == EAGAIN)
+ return false;
+ else
+ err_exit(errno, "remove_timeout");
+ }
+
+ if (m_qImpl.pop(a_data)) {
+ SemUtil::inc(slots);
+ return true;
+ }
+ return false;
+
+}
+
+
+template <
+ typename ELEM_T,
+ template <typename T> class Q_TYPE>
+void * LockFreeQueue<ELEM_T, Q_TYPE>::operator new(size_t size){
+ return mm_malloc(size);
+}
+
+template <
+ typename ELEM_T,
+ template <typename T> class Q_TYPE>
+void LockFreeQueue<ELEM_T, Q_TYPE>::operator delete(void *p) {
+ return mm_free(p);
+}
// include implementation files
-#include "lock_free_queue_impl.h"
-#include "lock_free_queue_impl_multiple_producer.h"
+#include "linked_lock_free_queue.h"
+#include "array_lock_free_queue.h"
#endif // _LOCK_FREE_QUEUE_H__
diff --git a/squeue/include/lock_free_queue_impl.h b/squeue/include/lock_free_queue_impl.h
deleted file mode 100644
index aa182a5..0000000
--- a/squeue/include/lock_free_queue_impl.h
+++ /dev/null
@@ -1,63 +0,0 @@
-#ifndef __LOCK_FREE_QUEUE_IMPL_H__
-#define __LOCK_FREE_QUEUE_IMPL_H__
-
-#include <assert.h> // assert()
-#include "mm.h"
-#include "sem_util.h"
-
-template <
- typename ELEM_T,
- template <typename T> class Q_TYPE>
-LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize):
- m_qImpl(qsize)
-{
-}
-
-template <
- typename ELEM_T,
- template <typename T> class Q_TYPE>
-LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue()
-{
-}
-
-template <
- typename ELEM_T,
- template <typename T> class Q_TYPE>
-inline uint32_t LockFreeQueue<ELEM_T, Q_TYPE>::size()
-{
- return m_qImpl.size();
-}
-
-template <
- typename ELEM_T,
- template <typename T> class Q_TYPE>
-inline bool LockFreeQueue<ELEM_T, Q_TYPE>::full()
-{
- return m_qImpl.full();
-}
-
-template <
- typename ELEM_T,
- template <typename T> class Q_TYPE>
-inline bool LockFreeQueue<ELEM_T, Q_TYPE>::empty()
-{
- return m_qImpl.empty();
-}
-
-template <
- typename ELEM_T,
- template <typename T> class Q_TYPE>
-inline bool LockFreeQueue<ELEM_T, Q_TYPE>::push(const ELEM_T &a_data)
-{
- return m_qImpl.push(a_data);
-}
-
-template <
- typename ELEM_T,
- template <typename T> class Q_TYPE>
-inline bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data)
-{
- return m_qImpl.pop(a_data);
-}
-
-#endif // __LOCK_FREE_QUEUE_IMPL_H__
diff --git a/squeue/include/queue_factory.h b/squeue/include/queue_factory.h
index b898a86..23446b0 100644
--- a/squeue/include/queue_factory.h
+++ b/squeue/include/queue_factory.h
@@ -4,10 +4,11 @@
#include "mm.h"
#include "hashtable.h"
#include "lock_free_queue.h"
-#include "SLinkedLockFreeQueue.h"
-namespace QueueFactory{
- hashtable_t * getHashTable() {
+class QueueFactory{
+private:
+
+ static hashtable_t * getHashTable() {
static hashtable_t *hashtable = NULL;
int first;
@@ -20,24 +21,9 @@
}
- template <typename T>
- SLinkedLockFreeQueue<T>* createLinkedLockFreeQueue(int key, size_t size) {
-
+
- SLinkedLockFreeQueue<T> *queue;
- hashtable_t *hashtable = getHashTable();
-
-
- if ((queue = (SLinkedLockFreeQueue<T> *)hashtable_get(hashtable, key)) == NULL ) {
- queue = new SLinkedLockFreeQueue<T>(size);
- hashtable_put(hashtable, key, (void *)queue);
- }
-
- return queue;
- }
-
-
- template <typename T>
+ template <typename T> static
LockFreeQueue<T>* createArrayLockFreeQueue(int key, size_t size=16) {
LockFreeQueue<T> *queue;
@@ -51,8 +37,9 @@
return queue;
}
+public:
- template <typename T>
+ template <typename T> static
LockFreeQueue<T>* createQueue(int key, size_t size = 16) {
return QueueFactory::createArrayLockFreeQueue<T>(key, size);
}
@@ -60,14 +47,13 @@
/**
* destroy queue
*/
- template <typename T>
+ template <typename T> static
void dropQueue(int key) {
-
LockFreeQueue<T> *queue = QueueFactory::createQueue<T> (key);
delete queue;
hashtable_t *hashtable = getHashTable();
hashtable_remove(hashtable, key);
}
-}
+};
#endif
diff --git a/squeue/mm.c b/squeue/mm.c
index 7b39391..9cf01f0 100644
--- a/squeue/mm.c
+++ b/squeue/mm.c
@@ -1,5 +1,5 @@
/*
- * 绠$悊鍏变韩鍐呭瓨鐨勫垎閰嶏紝涓庨噴鏀�
+ * 绠$悊鍏变韩鍐呭瓨鐨勫垎閰嶄笌閲婃斁
*/
#include "mm.h"
#include "sem_util.h"
diff --git a/test/test.h b/test/test.h
index c80ad70..084a7f1 100644
--- a/test/test.h
+++ b/test/test.h
@@ -1,9 +1,8 @@
#include "usg_common.h"
#include "usg_typedef.h"
#include "lock_free_queue.h"
-#include "SLinkedLockFreeQueue.h"
#include "queue_factory.h"
- #include <pthread.h>
+#include <pthread.h>
#define NTHREADS 3
diff --git a/test/test_queue b/test/test_queue
index 51f9055..5f6ee1b 100755
--- a/test/test_queue
+++ b/test/test_queue
Binary files differ
--
Gitblit v1.8.0