wangzhengquan
2021-01-21 2c65db46500207f8445aa4baa53bfbb6602e0e18
restructure
1个文件已删除
5个文件已添加
9个文件已修改
994 ■■■■ 已修改文件
CMakeLists.txt 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/CMakeLists.txt 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/futex_sem.cpp 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/futex_sem.h 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/psem.cpp 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/array_lock_free_queue.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/array_lock_free_queue2.h 322 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/array_lock_free_sem_queue.h 367 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 39 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/shm_queue.h 123 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/time_util.cpp 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/time_util.h 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/futex_demo.cpp 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
CMakeLists.txt
@@ -16,6 +16,7 @@
option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
option(BUILD_DOC "Build doc" OFF)
list(APPEND EXTRA_INCLUDES "${PROJECT_SOURCE_DIR}/include/usgcommon")
list(APPEND EXTRA_LIBS ${PROJECT_SOURCE_DIR}/lib/libusgcommon.a pthread rt)
src/CMakeLists.txt
@@ -8,24 +8,27 @@
configure_file(bus_config.h.in bus_config.h)
add_library(shm_queue 
  ./logger_factory.cpp
  ./socket/bus_server_socket.cpp
  ./socket/bus_server_socket_wrapper.cpp
  ./socket/shm_stream_mod_socket.cpp
  ./socket/shm_socket.cpp
  ./socket/shm_mod_socket.cpp
  ./psem.cpp
  ./svsem_util.cpp
  ./bus_error.cpp
  ./net/net_conn_pool.cpp
  ./net/net_mod_server_socket_wrapper.cpp
  ./net/net_mod_socket_wrapper.cpp
  ./net/net_mod_socket.cpp
  ./net/net_mod_socket_io.cpp
  ./net/net_mod_server_socket.cpp
  ./shm/shm_mm_wrapper.cpp
  ./shm/mm.cpp
  ./shm/hashtable.cpp
 ./logger_factory.cpp
./socket/bus_server_socket.cpp
./socket/bus_server_socket_wrapper.cpp
./socket/shm_stream_mod_socket.cpp
./socket/shm_socket.cpp
./socket/shm_mod_socket.cpp
./time_util.cpp
./psem.cpp
./svsem_util.cpp
./bus_error.cpp
./futex_sem.cpp
./net/net_conn_pool.cpp
./net/net_mod_server_socket_wrapper.cpp
./net/net_mod_socket_wrapper.cpp
./net/net_mod_socket.cpp
./net/net_mod_socket_io.cpp
./net/net_mod_server_socket.cpp
./shm/shm_mm_wrapper.cpp
./shm/mm.cpp
./shm/hashtable.cpp
    )
@@ -41,7 +44,8 @@
                           ${CMAKE_CURRENT_SOURCE_DIR}/socket
                           ${CMAKE_CURRENT_SOURCE_DIR}/net
                           )
target_link_libraries(shm_queue PUBLIC  ${EXTRA_LIBS} )
# install rules
@@ -55,6 +59,8 @@
./socket/bus_server_socket_wrapper.h
./psem.h
./key_def.h
./time_util.h
./futex_sem.h
./bus_error.h
./svsem_util.h
./logger_factory.h
src/futex_sem.cpp
New file
@@ -0,0 +1,8 @@
#include "futex_sem.h"
int futex(int *uaddr, int futex_op, int val,
    const struct timespec *timeout, int *uaddr2, int val3)
{
  return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr, val3);
}
src/futex_sem.h
New file
@@ -0,0 +1,15 @@
#ifndef _FUTEXT_SEM_H_
#define _FUTEXT_SEM_H_
#include "usg_common.h"
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/time.h>
#include <sys/mman.h>
#include <sys/stat.h>        /* For mode constants */
#include <fcntl.h>
int futex(int *uaddr, int futex_op, int val,
    const struct timespec *timeout, int *uaddr2, int val3);
#endif
src/psem.cpp
@@ -1,26 +1,11 @@
#include "psem.h"
#include <semaphore.h>
#include "time_util.h"
#define NANO 1000000000
static struct timespec psem_calc_abs_timeout(const struct timespec *ts) {
    struct timespec res;
  struct timespec timeout;
  if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
      err_exit(errno, "clock_gettime");
  res.tv_sec = timeout.tv_sec + ts->tv_sec;
  res.tv_nsec = timeout.tv_nsec + ts->tv_nsec;
  res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO);
  res.tv_nsec = res.tv_nsec % NANO;
  return res;
}
int psem_timedwait(sem_t *sem, const struct timespec *ts) {
    struct timespec abs_timeout = psem_calc_abs_timeout(ts);
    struct timespec abs_timeout = TimeUtil::calc_abs_time(ts);
  int rv ;
  while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) {
src/queue/array_lock_free_queue.h
@@ -1,5 +1,5 @@
#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#ifndef __ARRAY_LOCK_FREE_QUEUE_H__
#define __ARRAY_LOCK_FREE_QUEUE_H__
#include "atomic_ops.h"
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
src/queue/array_lock_free_queue2.h
File was deleted
src/queue/array_lock_free_sem_queue.h
New file
@@ -0,0 +1,367 @@
#ifndef __ARRAY_LOCK_FREE_SEM_QUEUE_H__
#define __ARRAY_LOCK_FREE_SEM_QUEUE_H__
#include "atomic_ops.h"
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
#include "logger_factory.h"
#include "mem_pool.h"
#include "shm_allocator.h"
#include "futex_sem.h"
#include "time_util.h"
/// @brief implementation of an array based lock free queue with support for
///        multiple producers
/// This class is prevented from being instantiated directly (all members and
/// methods are private). To instantiate a multiple producers lock free queue
/// you must use the ArrayLockFreeSemQueue fachade:
///   ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q;
#define LOCK_FREE_QUEUE_TIMEOUT  1
#define LOCK_FREE_QUEUE_NOWAIT  1 << 1
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
template <typename ELEM_T, typename Allocator = SHM_Allocator>
class ArrayLockFreeSemQueue
{
public:
  /// @brief constructor of the class
  ArrayLockFreeSemQueue(size_t qsize = 16);
  virtual ~ArrayLockFreeSemQueue();
  inline uint32_t size();
  inline bool full();
  inline bool empty();
  int push(const ELEM_T &a_data,  const struct timespec *timeout = NULL, int flag = 0);
  int pop(ELEM_T &a_data,  const struct timespec *timeout = NULL, int flag = 0);
  /// @brief calculate the index in the circular array that corresponds
  /// to a particular "count" value
  inline uint32_t countToIndex(uint32_t a_count);
  ELEM_T& operator[](unsigned i);
public:
  void *operator new(size_t size);
  void operator delete(void *p);
private:
  size_t Q_SIZE;
  /// @brief array to keep the elements
  ELEM_T *m_theQueue;
  /// @brief where a new element will be inserted
  uint32_t m_writeIndex;
  /// @brief where the next element where be extracted from
  uint32_t m_readIndex;
  /// @brief maximum read index for multiple producer queues
  /// If it's not the same as m_writeIndex it means
  /// there are writes pending to be "committed" to the queue, that means,
  /// the place for the data was reserved (the index in the array) but
  /// data is still not in the queue, so the thread trying to read will have
  /// to wait for those other threads to save the data into the queue
  ///
  /// note this is only used for multiple producers
  uint32_t m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  /// @brief number of elements in the queue
  int m_count;
#endif
  private:
  /// @brief disable copy constructor declaring it private
  ArrayLockFreeSemQueue<ELEM_T, Allocator>(const ArrayLockFreeSemQueue<ELEM_T> &a_src);
};
template <typename ELEM_T, typename Allocator>
ArrayLockFreeSemQueue<ELEM_T, Allocator>::ArrayLockFreeSemQueue(size_t qsize):
  Q_SIZE(qsize),
  m_writeIndex(0),      // initialisation is not atomic
  m_readIndex(0),       //
  m_maximumReadIndex(0) //
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  ,m_count(0)           //
#endif
{
  m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
}
  template <typename ELEM_T, typename Allocator>
ArrayLockFreeSemQueue<ELEM_T, Allocator>::~ArrayLockFreeSemQueue()
{
  // std::cout << "destroy ArrayLockFreeSemQueue\n";
  Allocator::deallocate(m_theQueue);
}
template <typename ELEM_T, typename Allocator>
  inline
uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
{
  // if Q_SIZE is a power of 2 this statement could be also written as
  // return (a_count & (Q_SIZE - 1));
  return (a_count % Q_SIZE);
}
template <typename ELEM_T, typename Allocator>
  inline
uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::size()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  return m_count;
#else
  uint32_t currentWriteIndex = m_maximumReadIndex;
  uint32_t currentReadIndex  = m_readIndex;
  // let's think of a scenario where this function returns bogus data
  // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
  // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
  // 2. afterwards this thread is preemted. While this thread is inactive 2
  // elements are inserted and removed from the queue, so m_maximumReadIndex
  // is 5 and m_readIndex 4. Real size is still 1
  // 3. Now the current thread comes back from preemption and reads m_readIndex.
  // currentReadIndex is 4
  // 4. currentReadIndex is bigger than currentWriteIndex, so
  // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
  // it returns that the queue is almost full, when it is almost empty
  //
  if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
  {
    return (currentWriteIndex - currentReadIndex);
  }
  else
  {
    return (Q_SIZE + currentWriteIndex - currentReadIndex);
  }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T, typename Allocator>
  inline
bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::full()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  return (m_count == (Q_SIZE));
#else
  uint32_t currentWriteIndex = m_writeIndex;
  uint32_t currentReadIndex  = m_readIndex;
  if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
  {
    // the queue is full
    return true;
  }
  else
  {
    // not full!
    return false;
  }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T, typename Allocator>
  inline
bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::empty()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  return (m_count == 0);
#else
  if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
  {
    // the queue is full
    return true;
  }
  else
  {
    // not full!
    return false;
  }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
  template <typename ELEM_T, typename Allocator>
int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data,  const struct timespec *timeout, int flag)
{
  uint32_t currentReadIndex;
  uint32_t currentWriteIndex;
  int s;
  do
  {
    currentWriteIndex = m_writeIndex;
    currentReadIndex  = m_readIndex;
    if (m_count == Q_SIZE) {
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
        }
      } else {
        s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
        }
      }
    }
  } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
  // We know now that this index is reserved for us. Use it to save the data
  m_theQueue[countToIndex(currentWriteIndex)] = a_data;
  // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
  // inserting in the queue. It might fail if there are more than 1 producer threads because this
  // operation has to be done in the same order as the previous CAS
  while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
  {
    // this is a good place to yield the thread in case there are more
    // software threads than hardware processors and you have more
    // than 1 producer thread
    // have a look at sched_yield (POSIX.1b)
    sched_yield();
  }
  AtomicAdd(&m_count, 1);
  s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
  if (s  == -1)
      err_exit(errno, "futex-FUTEX_WAKE");
  return 0;
}
  template <typename ELEM_T, typename Allocator>
int ArrayLockFreeSemQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag)
{
  uint32_t currentMaximumReadIndex;
  uint32_t currentReadIndex;
  int s;
  do
  {
    // to ensure thread-safety when there is more than 1 producer thread
    // a second index is defined (m_maximumReadIndex)
    currentReadIndex        = m_readIndex;
    currentMaximumReadIndex = m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    if (m_count == 0) {
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
        }
      } else {
        s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
        }
      }
    }
#else
    if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
    {
      // the queue is empty or
      // a producer thread has allocate space in the queue but is
      // waiting to commit the data into it
      return -1;
    }
#endif
    // retrieve the data from the queue
    a_data = m_theQueue[countToIndex(currentReadIndex)];
    // try to perfrom now the CAS operation on the read index. If we succeed
    // a_data already contains what m_readIndex pointed to before we
    // increased it
    if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
    {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
      // m_count.fetch_sub(1);
      AtomicSub(&m_count, 1);
#endif
      s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
      if (s  == -1)
        err_exit(errno, "futex-FUTEX_WAKE");
      return 0;
    }
    // it failed retrieving the element off the queue. Someone else must
    // have read the element stored at countToIndex(currentReadIndex)
    // before we could perform the CAS operation
  } while(1); // keep looping to try again!
  // Something went wrong. it shouldn't be possible to reach here
  assert(0);
  // Add this return statement to avoid compiler warnings
  return -1;
}
  template <typename ELEM_T, typename Allocator>
ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
  int currentCount = m_count;
  uint32_t currentReadIndex = m_readIndex;
  if (i >= currentCount)
  {
    std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
    std::exit(EXIT_FAILURE);
  }
  return m_theQueue[countToIndex(currentReadIndex+i)];
}
template <typename ELEM_T, typename Allocator>
void * ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator new(size_t size){
        return Allocator::allocate(size);
}
template <typename ELEM_T, typename Allocator>
void ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator delete(void *p) {
    return Allocator::deallocate(p);
}
#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
src/queue/lock_free_queue.h
@@ -221,8 +221,7 @@
{
 LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");   
    if (psem_wait(&slots) == -1) {
        err_msg(errno, "LockFreeQueue push");
        return errno;
        return -1;
    }
    
    if ( m_qImpl.push(a_data) ) {
@@ -241,13 +240,7 @@
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (psem_trywait(&slots) == -1) {
        if (errno == EAGAIN)
            return EAGAIN;
        else {
            err_msg(errno, "LockFreeQueue push_nowait");
            return errno;
        }
        return -1;
    }
    if ( m_qImpl.push(a_data)) {
@@ -265,15 +258,8 @@
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
{
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");       
    int rv;
    if ( psem_timedwait(&slots, ts) == -1) {
        if(errno == ETIMEDOUT)
            return EBUS_TIMEOUT;
        else {
           LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
           return errno;
        }
        return -1;
    }
    if (m_qImpl.push(a_data)){
@@ -297,8 +283,7 @@
  LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if (psem_wait(&items) == -1) {
        LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop");
        return errno;
        return -1;
    }
    if (m_qImpl.pop(a_data)) {
@@ -316,12 +301,7 @@
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (psem_trywait(&items) == -1) {
        if (errno == EAGAIN)
            return errno;
        else {
            LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait");
            return errno;
        }
        return -1;
    }
    if (m_qImpl.pop(a_data)) {
@@ -339,14 +319,7 @@
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
    if (psem_timedwait(&items, ts) == -1) {
        if (errno == ETIMEDOUT) {
            return EBUS_TIMEOUT;
        }
        else {
          LoggerFactory::getLogger()->error(errno, "3  LockFreeQueue pop_timeout %d", errno);
          return errno;
        }
       return -1;
    }
    if (m_qImpl.pop(a_data)) {
src/queue/shm_queue.h
@@ -6,12 +6,13 @@
#define __SHM_QUEUE_H__
#include "hashtable.h"
#include "lock_free_queue.h"
#include "logger_factory.h"
#include "sem_util.h"
#include "shm_allocator.h"
#include "usg_common.h"
#include "array_lock_free_sem_queue.h"
#include "bus_error.h"
template <typename ELEM_T> class SHMQueue {
@@ -20,7 +21,7 @@
public:
  /// @brief constructor of the class
  SHMQueue(int key = 0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
  SHMQueue(int key = 0, size_t qsize = 16);
  ~SHMQueue();
@@ -49,7 +50,8 @@
protected:
  /// @brief the actual queue-> methods are forwarded into the real
  ///        implementation
  LockFreeQueue<ELEM_T, SHM_Allocator> *queue;
  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *queue;
private:
  /// @brief disable copy constructor declaring it private
@@ -62,7 +64,7 @@
  hashtable_t *hashtable = mm_get_hashtable();
  std::set<int> *keyset = hashtable_keyset(hashtable);
  std::set<int>::iterator keyItr;
  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
  bool found;
  size_t count = 0;
  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
@@ -75,7 +77,7 @@
    }
    if (!found) {
      // 销毁共享内存的queue
      mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
      mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
      delete mqueue;
      hashtable_remove(hashtable, *keyItr);
      count++;
@@ -89,11 +91,11 @@
template <typename ELEM_T>
size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
  size_t count = 0;
  for(int i = 0; i< length; i++) {
    // 销毁共享内存的queue
    mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
    mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
    delete mqueue;
    hashtable_remove(hashtable, keys[i]);
    count++;
@@ -111,49 +113,22 @@
SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) {
  hashtable_t *hashtable = mm_get_hashtable();
  queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
  queue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
  if (queue == NULL || (void *)queue == (void *)1) {
    queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
    queue = new ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator>(qsize);
    hashtable_put(hashtable, key, (void *)queue);
  }
  queue->reference++;
  // queue->reference++;
  // LoggerFactory::getLogger()->debug("SHMQueue constructor reference===%d", queue->reference.load());
}
template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() {
  if(queue == NULL) {
     // queue已经销毁
    return;
  }
  sem_wait(&(queue->mutex));
  queue->reference--;
  // LoggerFactory::getLogger()->debug("SHMQueue destructor  reference===%d",
  if (queue->reference.load() == 0) {
      delete queue;
      queue = NULL;
      hashtable_t *hashtable = mm_get_hashtable();
      hashtable_remove(hashtable, KEY);
      // 此时queue已经销毁,无需  sem_post(&(queue->mutex))
      // printf("SHMQueue destructor delete queue\n");
  } else {
      sem_post(&(queue->mutex));
  }
}
template <typename ELEM_T> void SHMQueue<ELEM_T>::force_destroy() {
  if(queue == NULL) {
    // queue已经销毁
    return;
  }
  SemUtil::dec(queue->mutex);
  LoggerFactory::getLogger()->debug("SHMQueue destroy");
  delete queue;
  queue = NULL;
  hashtable_t *hashtable = mm_get_hashtable();
  hashtable_remove(hashtable, KEY);
  // 此时queue已经销毁,无需 SemUtil::inc(queue->mutex)
}
template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::size() {
@@ -170,36 +145,85 @@
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) {
  return queue->push(a_data);
  int rv = queue->push(a_data);
  if(rv == -1) {
    return errno;
  } else {
    return 0;
  }
}
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
  return queue->push_nowait(a_data);
  int rv =  queue->push(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
  if(rv == -1) {
    if (errno == EAGAIN)
      return EAGAIN;
    else {
        err_msg(errno, "LockFreeQueue push_nowait");
        return errno;
    }
  }
  return 0;
}
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data,
                                           const struct timespec *timeout) {
inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) {
  return queue->push_timeout(a_data, timeout);
  int rv = queue->push(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
  if(rv == -1) {
    if(errno == ETIMEDOUT)
        return EBUS_TIMEOUT;
    else {
       LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
       return errno;
    }
  }
  return 0;
}
template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) {
  // printf("SHMQueue pop before\n");
  int rv = queue->pop(a_data);
  // printf("SHMQueue after before\n");
  return rv;
  if(rv == -1) {
    return errno;
  } else {
    return 0;
  }
}
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
  return queue->pop_nowait(a_data);
  int rv = queue->pop(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
  if(rv == -1) {
    if (errno == EAGAIN)
      return errno;
    else {
        LoggerFactory::getLogger()->error(errno, " SHMQueue pop_nowait");
        return errno;
    }
  }
  return 0;
}
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) {
  return queue->pop_timeout(a_data, timeout);
  int rv;
  rv = queue->pop(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
  if(rv == -1) {
    if (errno == ETIMEDOUT) {
      return EBUS_TIMEOUT;
    } else {
      LoggerFactory::getLogger()->error(errno, " SHMQueue pop_timeout");
      return errno;
    }
  }
  return 0;
}
template <typename ELEM_T>
@@ -207,4 +231,7 @@
  return queue->operator[](i);
}
#endif
src/socket/shm_socket.cpp
@@ -383,10 +383,8 @@
  if (rv == 0) {
    // printf("shm_sendto push after\n");
    delete remoteQueue;
    return 0;
  } else {
    delete remoteQueue;
    mm_free(dest.buf);
    if(rv > EBUS_BASE) {
      // bus_errno = EBUS_TIMEOUT;
@@ -725,10 +723,7 @@
    socket->queue = NULL;
  }
  if (socket->remoteQueue != NULL) {
    delete socket->remoteQueue;
    socket->remoteQueue = NULL;
  }
  if (socket->messageQueue != NULL) {
    delete socket->messageQueue;
@@ -747,7 +742,6 @@
      client_socket = iter->second;
      client_socket->remoteQueue->push_timeout(close_msg, &timeout);
      delete client_socket->remoteQueue;
      client_socket->remoteQueue = NULL;
      delete client_socket->messageQueue;
src/socket/shm_socket.h
@@ -4,6 +4,7 @@
#include "usg_common.h"
#include "usg_typedef.h"
#include "shm_queue.h"
#include "lock_free_queue.h"
enum shm_socket_flag_t
{
src/time_util.cpp
New file
@@ -0,0 +1,26 @@
#include "time_util.h"
#define NANO 1000000000
struct timespec TimeUtil::calc_abs_time(const struct timespec *ts) {
    struct timespec res;
  struct timespec timeout;
  if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
      err_exit(errno, "clock_gettime");
  res.tv_sec = timeout.tv_sec + ts->tv_sec;
  res.tv_nsec = timeout.tv_nsec + ts->tv_nsec;
  res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO);
  res.tv_nsec = res.tv_nsec % NANO;
  return res;
}
struct timespec TimeUtil::trim_time(const struct timespec *ts) {
    struct timespec res;
  res.tv_sec = ts->tv_sec + floor(ts->tv_nsec / NANO);
  res.tv_nsec = ts->tv_nsec % NANO;
  return res;
}
src/time_util.h
New file
@@ -0,0 +1,14 @@
#ifndef _TIMEUTIL_H_
#define _TIMEUTIL_H_
#include "usg_common.h"
class TimeUtil {
public:
    // 计算当前时间+ts的绝对时间
    static struct timespec calc_abs_time(const struct timespec *ts);
    // 如果纳秒大于10e9,向秒进位
    static struct timespec trim_time(const struct timespec *ts) ;
};
#endif
test/futex_demo.cpp
@@ -19,11 +19,12 @@
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/time.h>
#include "usg_common.h"
#include <sys/mman.h>
#include <sys/stat.h>        /* For mode constants */
#include <fcntl.h>           /* For O_* constants */
#include "usg_common.h"
#define errExit(msg)    do { perror(msg); exit(EXIT_FAILURE); \
} while (0)