From 2c65db46500207f8445aa4baa53bfbb6602e0e18 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 21 一月 2021 16:37:03 +0800
Subject: [PATCH] restructure

---
 test/futex_demo.cpp                   |    3 
 src/time_util.h                       |   14 
 src/queue/lock_free_queue.h           |   39 -
 src/socket/shm_socket.h               |    1 
 src/futex_sem.cpp                     |    8 
 CMakeLists.txt                        |    1 
 src/socket/shm_socket.cpp             |    8 
 src/futex_sem.h                       |   15 
 /dev/null                             |  322 ------------------
 src/queue/array_lock_free_queue.h     |    4 
 src/queue/shm_queue.h                 |  123 ++++--
 src/queue/array_lock_free_sem_queue.h |  367 +++++++++++++++++++++
 src/psem.cpp                          |   19 -
 src/CMakeLists.txt                    |   44 +-
 src/time_util.cpp                     |   26 +
 15 files changed, 545 insertions(+), 449 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index ef9b64a..8310bfa 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -16,6 +16,7 @@
 option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
 option(BUILD_DOC "Build doc" OFF)
 
+
 list(APPEND EXTRA_INCLUDES "${PROJECT_SOURCE_DIR}/include/usgcommon")
 list(APPEND EXTRA_LIBS ${PROJECT_SOURCE_DIR}/lib/libusgcommon.a pthread rt)
 
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index cfd7f89..8343eb2 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -8,24 +8,27 @@
 configure_file(bus_config.h.in bus_config.h)
 
 add_library(shm_queue 
-  ./logger_factory.cpp
-  ./socket/bus_server_socket.cpp
-  ./socket/bus_server_socket_wrapper.cpp
-  ./socket/shm_stream_mod_socket.cpp
-  ./socket/shm_socket.cpp
-  ./socket/shm_mod_socket.cpp
-  ./psem.cpp
-  ./svsem_util.cpp
-  ./bus_error.cpp
-  ./net/net_conn_pool.cpp
-  ./net/net_mod_server_socket_wrapper.cpp
-  ./net/net_mod_socket_wrapper.cpp
-  ./net/net_mod_socket.cpp
-  ./net/net_mod_socket_io.cpp
-  ./net/net_mod_server_socket.cpp
-  ./shm/shm_mm_wrapper.cpp
-  ./shm/mm.cpp
-  ./shm/hashtable.cpp
+ ./logger_factory.cpp
+./socket/bus_server_socket.cpp
+./socket/bus_server_socket_wrapper.cpp
+./socket/shm_stream_mod_socket.cpp
+./socket/shm_socket.cpp
+./socket/shm_mod_socket.cpp
+./time_util.cpp
+./psem.cpp
+./svsem_util.cpp
+./bus_error.cpp
+./futex_sem.cpp
+./net/net_conn_pool.cpp
+./net/net_mod_server_socket_wrapper.cpp
+./net/net_mod_socket_wrapper.cpp
+./net/net_mod_socket.cpp
+./net/net_mod_socket_io.cpp
+./net/net_mod_server_socket.cpp
+./shm/shm_mm_wrapper.cpp
+./shm/mm.cpp
+./shm/hashtable.cpp
+
 
 	)
 
@@ -41,7 +44,8 @@
                            ${CMAKE_CURRENT_SOURCE_DIR}/socket
                            ${CMAKE_CURRENT_SOURCE_DIR}/net
                            )
- 
+
+
 target_link_libraries(shm_queue PUBLIC  ${EXTRA_LIBS} )
 
 # install rules
@@ -55,6 +59,8 @@
 ./socket/bus_server_socket_wrapper.h
 ./psem.h
 ./key_def.h
+./time_util.h
+./futex_sem.h
 ./bus_error.h
 ./svsem_util.h
 ./logger_factory.h
diff --git a/src/futex_sem.cpp b/src/futex_sem.cpp
new file mode 100644
index 0000000..226145f
--- /dev/null
+++ b/src/futex_sem.cpp
@@ -0,0 +1,8 @@
+#include "futex_sem.h"
+
+
+int futex(int *uaddr, int futex_op, int val,
+    const struct timespec *timeout, int *uaddr2, int val3)
+{
+  return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr, val3);
+}
\ No newline at end of file
diff --git a/src/futex_sem.h b/src/futex_sem.h
new file mode 100644
index 0000000..787fca6
--- /dev/null
+++ b/src/futex_sem.h
@@ -0,0 +1,15 @@
+#ifndef _FUTEXT_SEM_H_
+#define _FUTEXT_SEM_H_  
+#include "usg_common.h"
+#include <sys/wait.h>
+#include <sys/mman.h>
+#include <sys/syscall.h>
+#include <linux/futex.h>
+#include <sys/time.h>
+#include <sys/mman.h>
+#include <sys/stat.h>        /* For mode constants */
+#include <fcntl.h> 
+int futex(int *uaddr, int futex_op, int val,
+    const struct timespec *timeout, int *uaddr2, int val3);
+
+#endif
\ No newline at end of file
diff --git a/src/psem.cpp b/src/psem.cpp
index 8d9333f..d0eb2c2 100644
--- a/src/psem.cpp
+++ b/src/psem.cpp
@@ -1,26 +1,11 @@
 #include "psem.h"
 #include <semaphore.h>
+#include "time_util.h"
 
-#define NANO 1000000000
-
-
-static struct timespec psem_calc_abs_timeout(const struct timespec *ts) {
- 
-	struct timespec res;
-  struct timespec timeout;
-  if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
-      err_exit(errno, "clock_gettime");
-
-  res.tv_sec = timeout.tv_sec + ts->tv_sec;
-  res.tv_nsec = timeout.tv_nsec + ts->tv_nsec;
-  res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO);
-  res.tv_nsec = res.tv_nsec % NANO;
-  return res;
-}
 
 
 int psem_timedwait(sem_t *sem, const struct timespec *ts) {
-	struct timespec abs_timeout = psem_calc_abs_timeout(ts);
+	struct timespec abs_timeout = TimeUtil::calc_abs_time(ts);
 
   int rv ;
   while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) {
diff --git a/src/queue/array_lock_free_queue.h b/src/queue/array_lock_free_queue.h
index 233bc6a..ae1506d 100644
--- a/src/queue/array_lock_free_queue.h
+++ b/src/queue/array_lock_free_queue.h
@@ -1,5 +1,5 @@
-#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
-#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
+#ifndef __ARRAY_LOCK_FREE_QUEUE_H__
+#define __ARRAY_LOCK_FREE_QUEUE_H__
 #include "atomic_ops.h"
 #include <assert.h> // assert()
 #include <sched.h>  // sched_yield()
diff --git a/src/queue/array_lock_free_queue2.h b/src/queue/array_lock_free_queue2.h
deleted file mode 100644
index 233bc6a..0000000
--- a/src/queue/array_lock_free_queue2.h
+++ /dev/null
@@ -1,322 +0,0 @@
-#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
-#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
-#include "atomic_ops.h"
-#include <assert.h> // assert()
-#include <sched.h>  // sched_yield()
-#include "logger_factory.h"
-#include "mem_pool.h"
-#include "shm_allocator.h"
-
-/// @brief implementation of an array based lock free queue with support for 
-///        multiple producers
-/// This class is prevented from being instantiated directly (all members and
-/// methods are private). To instantiate a multiple producers lock free queue 
-/// you must use the ArrayLockFreeQueue fachade:
-///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
-
-
-#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
-template <typename ELEM_T, typename Allocator = SHM_Allocator>
-class ArrayLockFreeQueue
-{
-    // ArrayLockFreeQueue will be using this' private members
-    template <
-        typename ELEM_T_, 
-        typename Allocator_,
-        template <typename T, typename AT> class Q_TYPE
-        >
-    friend class LockFreeQueue;
-
-private:
-    /// @brief constructor of the class
-    ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
-    
-    virtual ~ArrayLockFreeQueue();
-    
-    inline uint32_t size();
-    
-    inline bool full();
-
-    inline bool empty();
-    
-    bool push(const ELEM_T &a_data);   
-    
-    bool pop(ELEM_T &a_data);
-    
-    /// @brief calculate the index in the circular array that corresponds
-    /// to a particular "count" value
-    inline uint32_t countToIndex(uint32_t a_count);
-
-    ELEM_T& operator[](unsigned i);
-    
-private:    
-    size_t Q_SIZE;
-    /// @brief array to keep the elements
-    ELEM_T *m_theQueue;
-
-    /// @brief where a new element will be inserted
-    uint32_t m_writeIndex;
-
-    /// @brief where the next element where be extracted from
-    uint32_t m_readIndex;
-    
-    /// @brief maximum read index for multiple producer queues
-    /// If it's not the same as m_writeIndex it means
-    /// there are writes pending to be "committed" to the queue, that means,
-    /// the place for the data was reserved (the index in the array) but  
-    /// data is still not in the queue, so the thread trying to read will have 
-    /// to wait for those other threads to save the data into the queue
-    ///
-    /// note this is only used for multiple producers
-    uint32_t m_maximumReadIndex;
-
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-    /// @brief number of elements in the queue
-    uint32_t m_count;
-#endif
-   
-    
-private:
-    /// @brief disable copy constructor declaring it private
-    ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
-
-};
-
-
-template <typename ELEM_T, typename Allocator>
-ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
-    Q_SIZE(qsize),
-    m_writeIndex(0),      // initialisation is not atomic
-    m_readIndex(0),       //
-    m_maximumReadIndex(0) //
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-    ,m_count(0)           //
-#endif
-{
-    m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
-
-}
-
-template <typename ELEM_T, typename Allocator>
-ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
-{
-    // std::cout << "destroy ArrayLockFreeQueue\n";
-    Allocator::deallocate(m_theQueue);
-    
-}
-
-template <typename ELEM_T, typename Allocator>
-inline 
-uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
-{
-    // if Q_SIZE is a power of 2 this statement could be also written as 
-    // return (a_count & (Q_SIZE - 1));
-    return (a_count % Q_SIZE);
-}
-
-template <typename ELEM_T, typename Allocator>
-inline 
-uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
-{
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
-    return m_count;
-#else
-
-    uint32_t currentWriteIndex = m_maximumReadIndex;
-    uint32_t currentReadIndex  = m_readIndex;
-
-    // let's think of a scenario where this function returns bogus data
-    // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
-    // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
-    // 2. afterwards this thread is preemted. While this thread is inactive 2 
-    // elements are inserted and removed from the queue, so m_maximumReadIndex 
-    // is 5 and m_readIndex 4. Real size is still 1
-    // 3. Now the current thread comes back from preemption and reads m_readIndex.
-    // currentReadIndex is 4
-    // 4. currentReadIndex is bigger than currentWriteIndex, so
-    // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
-    // it returns that the queue is almost full, when it is almost empty
-    //
-    if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
-    {
-        return (currentWriteIndex - currentReadIndex);
-    }
-    else
-    {
-        return (Q_SIZE + currentWriteIndex - currentReadIndex);
-    }
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-template <typename ELEM_T, typename Allocator>
-inline 
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
-{
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
-    return (m_count == (Q_SIZE));
-#else
-
-    uint32_t currentWriteIndex = m_writeIndex;
-    uint32_t currentReadIndex  = m_readIndex;
-    
-    if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
-    {
-        // the queue is full
-        return true;
-    }
-    else
-    {
-        // not full!
-        return false;
-    }
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-template <typename ELEM_T, typename Allocator>
-inline 
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
-{
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
-    return (m_count == 0);
-#else
-
-    if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
-    {
-        // the queue is full
-        return true;
-    }
-    else
-    {
-        // not full!
-        return false;
-    }
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-
-
-
-
-
-template <typename ELEM_T, typename Allocator>
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
-{
-    uint32_t currentReadIndex;
-    uint32_t currentWriteIndex;
-
-    do
-    {
-
-        currentWriteIndex = m_writeIndex;
-        currentReadIndex  = m_readIndex;
-    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
-        if (m_count == Q_SIZE) {
-            return false;
-        }
-    #else
-        if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
-        {
-            // the queue is full
-            return false;
-        }
-    #endif
-
-    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
-
-    // We know now that this index is reserved for us. Use it to save the data
-    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
-
-    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
-    // inserting in the queue. It might fail if there are more than 1 producer threads because this
-    // operation has to be done in the same order as the previous CAS
-
-    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
-    {
-        // this is a good place to yield the thread in case there are more
-        // software threads than hardware processors and you have more
-        // than 1 producer thread
-        // have a look at sched_yield (POSIX.1b)
-        sched_yield();
-    }
-
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-    AtomicAdd(&m_count, 1);
-#endif
-    return true;
-}
-
-
-template <typename ELEM_T, typename Allocator>
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
-{
-    uint32_t currentMaximumReadIndex;
-    uint32_t currentReadIndex;
-
-    do
-    {
-        // to ensure thread-safety when there is more than 1 producer thread
-        // a second index is defined (m_maximumReadIndex)
-        currentReadIndex        = m_readIndex;
-        currentMaximumReadIndex = m_maximumReadIndex;
-    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
-        if (m_count == 0) {
-            return false;
-        }
-    #else
-        if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
-        {
-            // the queue is empty or
-            // a producer thread has allocate space in the queue but is
-            // waiting to commit the data into it
-            return false;
-        }
-    #endif
-
-        // retrieve the data from the queue
-        a_data = m_theQueue[countToIndex(currentReadIndex)];
-
-        // try to perfrom now the CAS operation on the read index. If we succeed
-        // a_data already contains what m_readIndex pointed to before we
-        // increased it
-        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
-        {
-        #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-            // m_count.fetch_sub(1);
-            AtomicSub(&m_count, 1);
-        #endif
-            return true;
-        }
-
-        // it failed retrieving the element off the queue. Someone else must
-        // have read the element stored at countToIndex(currentReadIndex)
-        // before we could perform the CAS operation
-
-    } while(1); // keep looping to try again!
-
-    // Something went wrong. it shouldn't be possible to reach here
-    assert(0);
-
-    // Add this return statement to avoid compiler warnings
-    return false;
-}
-
-template <typename ELEM_T, typename Allocator>
-ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
-{
-    int currentCount = m_count;
-    uint32_t currentReadIndex = m_readIndex;
-    if (i >= currentCount)
-    {
-        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
-        std::exit(EXIT_FAILURE);
-    }
-    return m_theQueue[countToIndex(currentReadIndex+i)];
-}
-
-#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h
new file mode 100644
index 0000000..bb213e8
--- /dev/null
+++ b/src/queue/array_lock_free_sem_queue.h
@@ -0,0 +1,367 @@
+#ifndef __ARRAY_LOCK_FREE_SEM_QUEUE_H__
+#define __ARRAY_LOCK_FREE_SEM_QUEUE_H__
+#include "atomic_ops.h"
+#include <assert.h> // assert()
+#include <sched.h>  // sched_yield()
+#include "logger_factory.h"
+#include "mem_pool.h"
+#include "shm_allocator.h"
+#include "futex_sem.h"
+#include "time_util.h"
+
+
+/// @brief implementation of an array based lock free queue with support for
+///        multiple producers
+/// This class is prevented from being instantiated directly (all members and
+/// methods are private). To instantiate a multiple producers lock free queue
+/// you must use the ArrayLockFreeSemQueue fachade:
+///   ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q;
+
+#define LOCK_FREE_QUEUE_TIMEOUT  1
+#define LOCK_FREE_QUEUE_NOWAIT  1 << 1
+
+#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+
+template <typename ELEM_T, typename Allocator = SHM_Allocator>
+class ArrayLockFreeSemQueue
+{
+public:
+  /// @brief constructor of the class
+  ArrayLockFreeSemQueue(size_t qsize = 16);
+
+  virtual ~ArrayLockFreeSemQueue();
+
+  inline uint32_t size();
+
+  inline bool full();
+
+  inline bool empty();
+
+  int push(const ELEM_T &a_data,  const struct timespec *timeout = NULL, int flag = 0);
+
+  int pop(ELEM_T &a_data,  const struct timespec *timeout = NULL, int flag = 0);
+
+  /// @brief calculate the index in the circular array that corresponds
+  /// to a particular "count" value
+  inline uint32_t countToIndex(uint32_t a_count);
+
+  ELEM_T& operator[](unsigned i);
+
+public:
+  void *operator new(size_t size);
+  void operator delete(void *p);
+  
+private:
+  size_t Q_SIZE;
+  /// @brief array to keep the elements
+  ELEM_T *m_theQueue;
+
+  /// @brief where a new element will be inserted
+  uint32_t m_writeIndex;
+
+  /// @brief where the next element where be extracted from
+  uint32_t m_readIndex;
+
+  /// @brief maximum read index for multiple producer queues
+  /// If it's not the same as m_writeIndex it means
+  /// there are writes pending to be "committed" to the queue, that means,
+  /// the place for the data was reserved (the index in the array) but
+  /// data is still not in the queue, so the thread trying to read will have
+  /// to wait for those other threads to save the data into the queue
+  ///
+  /// note this is only used for multiple producers
+  uint32_t m_maximumReadIndex;
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+  /// @brief number of elements in the queue
+  int m_count;
+#endif
+
+
+  private:
+  /// @brief disable copy constructor declaring it private
+  ArrayLockFreeSemQueue<ELEM_T, Allocator>(const ArrayLockFreeSemQueue<ELEM_T> &a_src);
+
+};
+
+
+template <typename ELEM_T, typename Allocator>
+ArrayLockFreeSemQueue<ELEM_T, Allocator>::ArrayLockFreeSemQueue(size_t qsize):
+  Q_SIZE(qsize),
+  m_writeIndex(0),      // initialisation is not atomic
+  m_readIndex(0),       //
+  m_maximumReadIndex(0) //
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+  ,m_count(0)           //
+#endif
+{
+  m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
+
+}
+
+  template <typename ELEM_T, typename Allocator>
+ArrayLockFreeSemQueue<ELEM_T, Allocator>::~ArrayLockFreeSemQueue()
+{
+  // std::cout << "destroy ArrayLockFreeSemQueue\n";
+  Allocator::deallocate(m_theQueue);
+
+}
+
+template <typename ELEM_T, typename Allocator>
+  inline
+uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
+{
+  // if Q_SIZE is a power of 2 this statement could be also written as
+  // return (a_count & (Q_SIZE - 1));
+  return (a_count % Q_SIZE);
+}
+
+template <typename ELEM_T, typename Allocator>
+  inline
+uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::size()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+  return m_count;
+#else
+
+  uint32_t currentWriteIndex = m_maximumReadIndex;
+  uint32_t currentReadIndex  = m_readIndex;
+
+  // let's think of a scenario where this function returns bogus data
+  // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
+  // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
+  // 2. afterwards this thread is preemted. While this thread is inactive 2
+  // elements are inserted and removed from the queue, so m_maximumReadIndex
+  // is 5 and m_readIndex 4. Real size is still 1
+  // 3. Now the current thread comes back from preemption and reads m_readIndex.
+  // currentReadIndex is 4
+  // 4. currentReadIndex is bigger than currentWriteIndex, so
+  // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+  // it returns that the queue is almost full, when it is almost empty
+  //
+  if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
+  {
+    return (currentWriteIndex - currentReadIndex);
+  }
+  else
+  {
+    return (Q_SIZE + currentWriteIndex - currentReadIndex);
+  }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, typename Allocator>
+  inline
+bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::full()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+  return (m_count == (Q_SIZE));
+#else
+
+  uint32_t currentWriteIndex = m_writeIndex;
+  uint32_t currentReadIndex  = m_readIndex;
+
+  if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+  {
+    // the queue is full
+    return true;
+  }
+  else
+  {
+    // not full!
+    return false;
+  }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, typename Allocator>
+  inline
+bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::empty()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+  return (m_count == 0);
+#else
+
+  if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
+  {
+    // the queue is full
+    return true;
+  }
+  else
+  {
+    // not full!
+    return false;
+  }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+
+
+
+
+
+  template <typename ELEM_T, typename Allocator>
+int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data,  const struct timespec *timeout, int flag)
+{
+  uint32_t currentReadIndex;
+  uint32_t currentWriteIndex;
+  int s;
+
+  do
+  {
+    currentWriteIndex = m_writeIndex;
+    currentReadIndex  = m_readIndex;
+
+    if (m_count == Q_SIZE) {
+      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+        return -1;
+      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+        const struct timespec ts = TimeUtil::trim_time(timeout);
+        s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
+        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
+          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+          return -1;
+        }
+            
+      } else {
+        s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
+        if (s == -1 && errno != EAGAIN && errno != EINTR) {
+          return -1;
+        }
+      }
+
+    }
+
+
+  } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+  // We know now that this index is reserved for us. Use it to save the data
+  m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+
+  // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
+  // inserting in the queue. It might fail if there are more than 1 producer threads because this
+  // operation has to be done in the same order as the previous CAS
+
+  while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
+  {
+    // this is a good place to yield the thread in case there are more
+    // software threads than hardware processors and you have more
+    // than 1 producer thread
+    // have a look at sched_yield (POSIX.1b)
+    sched_yield();
+  }
+
+  AtomicAdd(&m_count, 1);
+  s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
+  if (s  == -1)
+      err_exit(errno, "futex-FUTEX_WAKE");
+  return 0;
+}
+
+
+  template <typename ELEM_T, typename Allocator>
+int ArrayLockFreeSemQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag)
+{
+  uint32_t currentMaximumReadIndex;
+  uint32_t currentReadIndex;
+
+  int s;
+  do
+  {
+    // to ensure thread-safety when there is more than 1 producer thread
+    // a second index is defined (m_maximumReadIndex)
+    currentReadIndex        = m_readIndex;
+    currentMaximumReadIndex = m_maximumReadIndex;
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+    if (m_count == 0) {
+
+      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+        return -1;
+      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+        const struct timespec ts = TimeUtil::trim_time(timeout);
+        s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
+        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
+          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+          return -1;
+        }
+            
+      } else {
+        s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
+        if (s == -1 && errno != EAGAIN && errno != EINTR) {
+          return -1;
+        }
+      }
+    }
+#else
+    if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
+    {
+      // the queue is empty or
+      // a producer thread has allocate space in the queue but is
+      // waiting to commit the data into it
+      return -1;
+    }
+#endif
+
+    // retrieve the data from the queue
+    a_data = m_theQueue[countToIndex(currentReadIndex)];
+
+    // try to perfrom now the CAS operation on the read index. If we succeed
+    // a_data already contains what m_readIndex pointed to before we
+    // increased it
+    if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
+    {
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+      // m_count.fetch_sub(1);
+      AtomicSub(&m_count, 1);
+#endif
+
+      s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
+      if (s  == -1)
+        err_exit(errno, "futex-FUTEX_WAKE");
+      return 0;
+    }
+
+    // it failed retrieving the element off the queue. Someone else must
+    // have read the element stored at countToIndex(currentReadIndex)
+    // before we could perform the CAS operation
+
+  } while(1); // keep looping to try again!
+
+  // Something went wrong. it shouldn't be possible to reach here
+  assert(0);
+
+  // Add this return statement to avoid compiler warnings
+  return -1;
+}
+
+  template <typename ELEM_T, typename Allocator>
+ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i)
+{
+  int currentCount = m_count;
+  uint32_t currentReadIndex = m_readIndex;
+  if (i >= currentCount)
+  {
+    std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
+    std::exit(EXIT_FAILURE);
+  }
+  return m_theQueue[countToIndex(currentReadIndex+i)];
+}
+
+
+
+template <typename ELEM_T, typename Allocator>
+void * ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator new(size_t size){
+        return Allocator::allocate(size);
+}
+
+template <typename ELEM_T, typename Allocator>
+void ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator delete(void *p) {
+    return Allocator::deallocate(p);
+}
+
+#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index b7dfd9f..01e597c 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -221,8 +221,7 @@
 {
  LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");   
     if (psem_wait(&slots) == -1) {
-        err_msg(errno, "LockFreeQueue push");
-        return errno;
+        return -1;
     }
     
     if ( m_qImpl.push(a_data) ) {
@@ -241,13 +240,7 @@
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
 {
     if (psem_trywait(&slots) == -1) {
-        if (errno == EAGAIN)
-            return EAGAIN;
-        else {
-            err_msg(errno, "LockFreeQueue push_nowait");
-            return errno;
-        }
-
+        return -1;
     }
 
     if ( m_qImpl.push(a_data)) {
@@ -265,15 +258,8 @@
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
 {
 LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");       
-    int rv;
     if ( psem_timedwait(&slots, ts) == -1) {
-
-        if(errno == ETIMEDOUT)
-            return EBUS_TIMEOUT;
-        else {
-           LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
-           return errno;
-        }
+        return -1;
     }
 
     if (m_qImpl.push(a_data)){
@@ -297,8 +283,7 @@
 
   LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
     if (psem_wait(&items) == -1) {
-        LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop");
-        return errno;
+        return -1;
     }
 
     if (m_qImpl.pop(a_data)) {
@@ -316,12 +301,7 @@
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
 {
     if (psem_trywait(&items) == -1) {
-        if (errno == EAGAIN)
-            return errno;
-        else {
-            LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait");
-            return errno;
-        }
+        return -1;
     }
 
     if (m_qImpl.pop(a_data)) {
@@ -339,14 +319,7 @@
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
 {
     if (psem_timedwait(&items, ts) == -1) {
-        if (errno == ETIMEDOUT) {
-            return EBUS_TIMEOUT;
-        }
-       
-        else {
-          LoggerFactory::getLogger()->error(errno, "3  LockFreeQueue pop_timeout %d", errno);
-          return errno;
-        }
+       return -1;
     }
 
     if (m_qImpl.pop(a_data)) {
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 7d98eaa..5d2d9b6 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -6,12 +6,13 @@
 #define __SHM_QUEUE_H__
 
 #include "hashtable.h"
-#include "lock_free_queue.h"
+ 
 #include "logger_factory.h"
 #include "sem_util.h"
 #include "shm_allocator.h"
 #include "usg_common.h"
-
+#include "array_lock_free_sem_queue.h"
+#include "bus_error.h"
 
 template <typename ELEM_T> class SHMQueue {
 
@@ -20,7 +21,7 @@
 
 public:
   /// @brief constructor of the class
-  SHMQueue(int key = 0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
+  SHMQueue(int key = 0, size_t qsize = 16);
 
   ~SHMQueue();
 
@@ -49,7 +50,8 @@
 protected:
   /// @brief the actual queue-> methods are forwarded into the real
   ///        implementation
-  LockFreeQueue<ELEM_T, SHM_Allocator> *queue;
+
+  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *queue;
 
 private:
   /// @brief disable copy constructor declaring it private
@@ -62,7 +64,7 @@
   hashtable_t *hashtable = mm_get_hashtable();
   std::set<int> *keyset = hashtable_keyset(hashtable);
   std::set<int>::iterator keyItr;
-  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
+  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
   bool found;
   size_t count = 0;
   for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
@@ -75,7 +77,7 @@
     }
     if (!found) {
       // 閿�姣佸叡浜唴瀛樼殑queue
-      mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
+      mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
       delete mqueue;
       hashtable_remove(hashtable, *keyItr);
       count++;
@@ -89,11 +91,11 @@
 template <typename ELEM_T>
 size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) {
   hashtable_t *hashtable = mm_get_hashtable();
-  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
+  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
   size_t count = 0;
   for(int i = 0; i< length; i++) {
     // 閿�姣佸叡浜唴瀛樼殑queue
-    mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
+    mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
     delete mqueue;
     hashtable_remove(hashtable, keys[i]);
     count++;
@@ -111,49 +113,22 @@
 SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) {
 
   hashtable_t *hashtable = mm_get_hashtable();
-  queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
+  queue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
   if (queue == NULL || (void *)queue == (void *)1) {
-    queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
+    queue = new ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator>(qsize);
     hashtable_put(hashtable, key, (void *)queue);
   }
-  queue->reference++;
+  // queue->reference++;
   // LoggerFactory::getLogger()->debug("SHMQueue constructor reference===%d", queue->reference.load());
 }
 
 template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() {
-  if(queue == NULL) {
-     // queue宸茬粡閿�姣�
-    return;
-  }
-
-  sem_wait(&(queue->mutex));
-  queue->reference--;
-  // LoggerFactory::getLogger()->debug("SHMQueue destructor  reference===%d",
-  if (queue->reference.load() == 0) {
-      delete queue;
-      queue = NULL;
-      hashtable_t *hashtable = mm_get_hashtable();
-      hashtable_remove(hashtable, KEY);
-      // 姝ゆ椂queue宸茬粡閿�姣侊紝鏃犻渶  sem_post(&(queue->mutex))
-      // printf("SHMQueue destructor delete queue\n");
-  } else {
-      sem_post(&(queue->mutex));
-  }
-  
-}
-
-template <typename ELEM_T> void SHMQueue<ELEM_T>::force_destroy() {
-  if(queue == NULL) {
-    // queue宸茬粡閿�姣�
-    return;
-  }
-
-  SemUtil::dec(queue->mutex);
+  LoggerFactory::getLogger()->debug("SHMQueue destroy");
   delete queue;
   queue = NULL;
   hashtable_t *hashtable = mm_get_hashtable();
   hashtable_remove(hashtable, KEY);
-  // 姝ゆ椂queue宸茬粡閿�姣侊紝鏃犻渶 SemUtil::inc(queue->mutex)
+  
 }
 
 template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::size() {
@@ -170,36 +145,85 @@
 
 template <typename ELEM_T>
 inline int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) {
-  return queue->push(a_data);
+  int rv = queue->push(a_data);
+  if(rv == -1) {
+    return errno;
+  } else {
+    return 0;
+  }
 }
 
 template <typename ELEM_T>
 inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
-  return queue->push_nowait(a_data);
+  int rv =  queue->push(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
+  if(rv == -1) {
+    if (errno == EAGAIN)
+      return EAGAIN;
+    else {
+        err_msg(errno, "LockFreeQueue push_nowait");
+        return errno;
+    }
+  }
+  return 0;
 }
 
 template <typename ELEM_T>
-inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data,
-                                           const struct timespec *timeout) {
+inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) {
 
-  return queue->push_timeout(a_data, timeout);
+  int rv = queue->push(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
+  if(rv == -1) {
+    if(errno == ETIMEDOUT)
+        return EBUS_TIMEOUT;
+    else {
+       LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
+       return errno;
+    }
+  }
+  return 0;
 }
 
 template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) {
   // printf("SHMQueue pop before\n");
   int rv = queue->pop(a_data);
   // printf("SHMQueue after before\n");
-  return rv;
+  if(rv == -1) {
+    return errno;
+  } else {
+    return 0;
+  }
 }
 
 template <typename ELEM_T>
 inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
-  return queue->pop_nowait(a_data);
+  int rv = queue->pop(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
+
+  if(rv == -1) {
+    if (errno == EAGAIN)
+      return errno;
+    else {
+        LoggerFactory::getLogger()->error(errno, " SHMQueue pop_nowait");
+        return errno;
+    }
+  }
+  return 0;
+  
 }
 
 template <typename ELEM_T>
 inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) {
-  return queue->pop_timeout(a_data, timeout);
+
+  int rv;
+  rv = queue->pop(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
+  if(rv == -1) {
+    if (errno == ETIMEDOUT) {
+      return EBUS_TIMEOUT;
+    } else {
+      LoggerFactory::getLogger()->error(errno, " SHMQueue pop_timeout");
+      return errno;
+    }
+  }
+  return 0;
+  
 }
 
 template <typename ELEM_T>
@@ -207,4 +231,7 @@
   return queue->operator[](i);
 }
 
+
+
+
 #endif
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 534202d..76e906f 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -383,10 +383,8 @@
 
   if (rv == 0) {
     // printf("shm_sendto push after\n");
-    delete remoteQueue;
     return 0;
   } else {
-    delete remoteQueue;
     mm_free(dest.buf);
     if(rv > EBUS_BASE) {
       // bus_errno = EBUS_TIMEOUT;
@@ -725,10 +723,7 @@
     socket->queue = NULL;
   }
 
-  if (socket->remoteQueue != NULL) {
-    delete socket->remoteQueue;
-    socket->remoteQueue = NULL;
-  }
+  
 
   if (socket->messageQueue != NULL) {
     delete socket->messageQueue;
@@ -747,7 +742,6 @@
       client_socket = iter->second;
 
       client_socket->remoteQueue->push_timeout(close_msg, &timeout);
-      delete client_socket->remoteQueue;
       client_socket->remoteQueue = NULL;
 
       delete client_socket->messageQueue;
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index 0917d00..abc8e20 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -4,6 +4,7 @@
 #include "usg_common.h"
 #include "usg_typedef.h"
 #include "shm_queue.h"
+#include "lock_free_queue.h"
 
 enum shm_socket_flag_t
 {
diff --git a/src/time_util.cpp b/src/time_util.cpp
new file mode 100644
index 0000000..f00b4f4
--- /dev/null
+++ b/src/time_util.cpp
@@ -0,0 +1,26 @@
+#include "time_util.h"
+
+#define NANO 1000000000
+
+
+struct timespec TimeUtil::calc_abs_time(const struct timespec *ts) {
+ 
+	struct timespec res;
+  struct timespec timeout;
+  if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
+      err_exit(errno, "clock_gettime");
+
+  res.tv_sec = timeout.tv_sec + ts->tv_sec;
+  res.tv_nsec = timeout.tv_nsec + ts->tv_nsec;
+  res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO);
+  res.tv_nsec = res.tv_nsec % NANO;
+  return res;
+}
+
+struct timespec TimeUtil::trim_time(const struct timespec *ts) {
+ 
+	struct timespec res;
+  res.tv_sec = ts->tv_sec + floor(ts->tv_nsec / NANO);
+  res.tv_nsec = ts->tv_nsec % NANO;
+  return res;
+}
\ No newline at end of file
diff --git a/src/time_util.h b/src/time_util.h
new file mode 100644
index 0000000..1f23d39
--- /dev/null
+++ b/src/time_util.h
@@ -0,0 +1,14 @@
+#ifndef _TIMEUTIL_H_
+#define _TIMEUTIL_H_  
+#include "usg_common.h"
+class TimeUtil {
+public:
+	// 璁$畻褰撳墠鏃堕棿+ts鐨勭粷瀵规椂闂�
+	static struct timespec calc_abs_time(const struct timespec *ts);
+
+	// 濡傛灉绾崇澶т簬10e9锛屽悜绉掕繘浣�
+	static struct timespec trim_time(const struct timespec *ts) ;
+};
+
+ 
+#endif
\ No newline at end of file
diff --git a/test/futex_demo.cpp b/test/futex_demo.cpp
index d6887e8..2899862 100644
--- a/test/futex_demo.cpp
+++ b/test/futex_demo.cpp
@@ -19,11 +19,12 @@
 #include <sys/syscall.h>
 #include <linux/futex.h>
 #include <sys/time.h>
-#include "usg_common.h"
 #include <sys/mman.h>
 #include <sys/stat.h>        /* For mode constants */
 #include <fcntl.h>           /* For O_* constants */
 
+#include "usg_common.h"
+
 
 #define errExit(msg)    do { perror(msg); exit(EXIT_FAILURE); \
 } while (0)

--
Gitblit v1.8.0