From 082633f08aae8eea19bd7050cbe4a75e5ed1ac6f Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 07 七月 2020 12:07:29 +0800
Subject: [PATCH] update

---
 squeue/include/lock_free_queue.h |  265 ++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 194 insertions(+), 71 deletions(-)

diff --git a/squeue/include/lock_free_queue.h b/squeue/include/lock_free_queue.h
index a8f7801..c8a672c 100644
--- a/squeue/include/lock_free_queue.h
+++ b/squeue/include/lock_free_queue.h
@@ -4,9 +4,12 @@
 #include <stdint.h>     // uint32_t
 #include <atomic>
 #include <usg_common.h>
+#include <assert.h> // assert()
+#include "mm.h" 
+#include "sem_util.h"
 
 // default Queue size
-#define LOCK_FREE_Q_DEFAULT_SIZE 65536 // (2^16)
+#define LOCK_FREE_Q_DEFAULT_SIZE 16
 
 // define this macro if calls to "size" must return the real size of the 
 // queue. If it is undefined  that function will try to take a snapshot of 
@@ -17,7 +20,7 @@
 //
 
 template <typename ELEM_T>
-class ArrayLockFreeQueueMultipleProducers;
+class ArrayLockFreeQueue;
 
 
 /// @brief Lock-free queue based on a circular array
@@ -32,7 +35,7 @@
 ///   ArrayLockFreeQueue<int, 16> q;
 ///                              // queue of ints of size (16 - 1) and
 ///                              // defaulted to single producer
-///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q;
+///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
 ///                              // queue of ints of size (100 - 1) with support
 ///                              // for multiple producers
 ///
@@ -57,13 +60,16 @@
 ///        last 4 elements of the queue are not used when the counter rolls
 ///        over to 0
 /// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and 
-///        ArrayLockFreeQueueMultipleProducers are supported (single producer
+///        ArrayLockFreeQueue are supported (single producer
 ///        by default)
 template <
     typename ELEM_T, 
-    template <typename T> class Q_TYPE = ArrayLockFreeQueueMultipleProducers >
+    template <typename T> class Q_TYPE = ArrayLockFreeQueue >
 class LockFreeQueue
 {
+private:
+    int slots;
+    int items;
 public:    
     /// @brief constructor of the class
     LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
@@ -88,7 +94,7 @@
     /// environments this function might return bogus values. See help in method
     /// LockFreeQueue::size
     inline bool full();
-    
+
     inline bool empty();
 
     /// @brief push an element at the tail of the queue
@@ -97,13 +103,21 @@
     /// structures to be inserted in the queue you should think of instantiate the template
     /// of the queue as a pointer to that large structure
     /// @return true if the element was inserted in the queue. False if the queue was full
-    inline bool push(const ELEM_T &a_data);
+    bool push(const ELEM_T &a_data);
+    bool push_nowait(const ELEM_T &a_data);
+    bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
 
     /// @brief pop the element at the head of the queue
     /// @param a reference where the element in the head of the queue will be saved to
     /// Note that the a_data parameter might contain rubbish if the function returns false
     /// @return true if the element was successfully extracted from the queue. False if the queue was empty
-    inline bool pop(ELEM_T &a_data);
+    bool pop(ELEM_T &a_data);
+    bool pop_nowait(ELEM_T &a_data);
+    bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
+
+
+    void *operator new(size_t size);
+    void operator delete(void *p);
 
 protected:
     /// @brief the actual queue. methods are forwarded into the real 
@@ -117,77 +131,186 @@
 
  
 
-/// @brief implementation of an array based lock free queue with support for 
-///        multiple producers
-/// This class is prevented from being instantiated directly (all members and
-/// methods are private). To instantiate a multiple producers lock free queue 
-/// you must use the ArrayLockFreeQueue fachade:
-///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q;
-template <typename ELEM_T>
-class ArrayLockFreeQueueMultipleProducers
+template <
+    typename ELEM_T, 
+    template <typename T> class Q_TYPE>
+LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize):
+    m_qImpl(qsize)
 {
-    // ArrayLockFreeQueue will be using this' private members
-    template <
-        typename ELEM_T_, 
-        template <typename T> class Q_TYPE >
-    friend class LockFreeQueue;
+    slots = SemUtil::get(IPC_PRIVATE, qsize);
+    items = SemUtil::get(IPC_PRIVATE, 0);
+}
 
-private:
-    /// @brief constructor of the class
-    ArrayLockFreeQueueMultipleProducers(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
-    
-    virtual ~ArrayLockFreeQueueMultipleProducers();
-    
-    inline uint32_t size();
-    
-    inline bool full();
+template <
+    typename ELEM_T, 
+    template <typename T> class Q_TYPE>
+LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue()
+{
+    SemUtil::remove(slots);
+    SemUtil::remove(items);
+}
 
-    inline bool empty();
-    
-    bool push(const ELEM_T &a_data);   
-    
-    bool pop(ELEM_T &a_data);
-    
-    /// @brief calculate the index in the circular array that corresponds
-    /// to a particular "count" value
-    inline uint32_t countToIndex(uint32_t a_count);
-    
-private:    
-    size_t Q_SIZE;
-    /// @brief array to keep the elements
-    ELEM_T *m_theQueue;
+template <
+    typename ELEM_T, 
+    template <typename T> class Q_TYPE>
+inline uint32_t LockFreeQueue<ELEM_T, Q_TYPE>::size()
+{
+    return m_qImpl.size();
+}  
 
-    /// @brief where a new element will be inserted
-    std::atomic<uint32_t> m_writeIndex;
+template <
+    typename ELEM_T, 
+    template <typename T> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Q_TYPE>::full()
+{
+    return m_qImpl.full();
+}
 
-    /// @brief where the next element where be extracted from
-    std::atomic<uint32_t> m_readIndex;
-    
-    /// @brief maximum read index for multiple producer queues
-    /// If it's not the same as m_writeIndex it means
-    /// there are writes pending to be "committed" to the queue, that means,
-    /// the place for the data was reserved (the index in the array) but  
-    /// data is still not in the queue, so the thread trying to read will have 
-    /// to wait for those other threads to save the data into the queue
-    ///
-    /// note this is only used for multiple producers
-    std::atomic<uint32_t> m_maximumReadIndex;
-
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-    /// @brief number of elements in the queue
-    std::atomic<uint32_t> m_count;
-#endif
-    static int m_reference;
-
-private:
-    /// @brief disable copy constructor declaring it private
-    ArrayLockFreeQueueMultipleProducers<ELEM_T>(const ArrayLockFreeQueueMultipleProducers<ELEM_T> &a_src);
+template <
+    typename ELEM_T, 
+    template <typename T> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Q_TYPE>::empty()
+{
+    return m_qImpl.empty();
+}  
 
 
-};
+template <
+    typename ELEM_T, 
+    template <typename T> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Q_TYPE>::push(const ELEM_T &a_data)
+{
+    if (SemUtil::dec(slots) == -1) {
+        err_exit(errno, "push");
+    }
+
+    if ( m_qImpl.push(a_data) ) {
+        SemUtil::inc(items);      
+        return true;
+    }
+    return false;
+    
+}
+
+template <
+    typename ELEM_T, 
+    template <typename T> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Q_TYPE>::push_nowait(const ELEM_T &a_data)
+{
+    if (SemUtil::dec_nowait(slots) == -1) {
+        if (errno == EAGAIN)
+            return false;
+        else
+            err_exit(errno, "push_nowait");
+    }
+
+    if ( m_qImpl.push(a_data)) {
+        SemUtil::inc(items);     
+        return true;
+    }
+    return false;
+    
+}
+
+template <
+    typename ELEM_T, 
+    template <typename T> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
+{
+
+    if (SemUtil::dec_timeout(slots, timeout) == -1) {
+        if (errno == EAGAIN)
+            return false;
+        else 
+            err_exit(errno, "push_timeout");
+    }
+
+    if (m_qImpl.push(a_data)){
+        SemUtil::inc(items);       
+        return true;
+    }
+    return false;
+    
+}
+
+
+
+
+template <
+    typename ELEM_T, 
+    template <typename T> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data)
+{
+    if (SemUtil::dec(items) == -1) {
+        err_exit(errno, "remove");
+    }
+
+    if (m_qImpl.pop(a_data)) {
+        SemUtil::inc(slots);      
+        return true;
+    }
+    return false;
+    
+}
+
+template <
+    typename ELEM_T, 
+    template <typename T> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_nowait(ELEM_T &a_data)
+{
+    if (SemUtil::dec_nowait(items) == -1) {
+        if (errno == EAGAIN)
+            return false;
+        else
+            err_exit(errno, "remove_nowait");
+    }
+
+    if (m_qImpl.pop(a_data)) {
+        SemUtil::inc(slots);     
+        return true;
+    }
+    return false;
+    
+}
+
+ 
+template <
+    typename ELEM_T, 
+    template <typename T> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
+{
+    if (SemUtil::dec_timeout(items, timeout) == -1) {
+        if (errno == EAGAIN)
+            return false;
+        else 
+            err_exit(errno, "remove_timeout");
+    }
+
+    if (m_qImpl.pop(a_data)) {
+        SemUtil::inc(slots);       
+        return true;
+    }
+    return false;
+    
+}
+
+
+template <
+    typename ELEM_T, 
+    template <typename T> class Q_TYPE>
+void * LockFreeQueue<ELEM_T, Q_TYPE>::operator new(size_t size){
+        return mm_malloc(size);
+}
+
+template <
+    typename ELEM_T, 
+    template <typename T> class Q_TYPE>
+void LockFreeQueue<ELEM_T, Q_TYPE>::operator delete(void *p) {
+    return mm_free(p);
+}
 
 // include implementation files
-#include "lock_free_queue_impl.h"
-#include "lock_free_queue_impl_multiple_producer.h"
+#include "linked_lock_free_queue.h"
+#include "array_lock_free_queue.h"
 
 #endif // _LOCK_FREE_QUEUE_H__

--
Gitblit v1.8.0