tmp
wangzhengquan
2021-01-22 4c73fd7179e92bee9cccb65e46823b00f568acb3
tmp
1个文件已删除
3个文件已添加
15个文件已修改
1305 ■■■■ 已修改文件
CMakeLists.txt 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build.sh 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/CMakeLists.txt 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_def.h 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_server_socket.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.cpp 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/array_lock_free_queue.h 487 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/array_lock_free_sem_queue.h 41 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 404 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/shm_queue.h 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.cpp 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/CMakeLists.txt 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/bus_test.cpp 116 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_mod_bus.cpp 131 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
CMakeLists.txt
@@ -13,7 +13,10 @@
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}")
# set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}")
option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
if (CMAKE_BUILD_TYPE EQUAL "Release")
#option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
endif()
option(BUILD_DOC "Build doc" OFF)
@@ -29,4 +32,5 @@
    add_subdirectory(${PROJECT_SOURCE_DIR}/src)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
endif()
build.sh
@@ -2,6 +2,7 @@
BUILD_TYPE="Debug"
BUILD_DOC="OFF"
BUILD_SHARED_LIBS="OFF"
function usage() {
    echo "build.sh [release | debug | doc]"
@@ -10,6 +11,7 @@
case ${1} in
  "release")
  BUILD_TYPE="Release"
  BUILD_SHARED_LIBS="ON"
  ;;
  
  "debug")
@@ -48,7 +50,7 @@
# -DBUILD_SHARED_LIBS=ON
# -DCMAKE_INSTALL_PREFIX=$(pwd/../dest)
# -DQCA_MAN_INSTALL_DIR:PATH=/usr/share/man 
cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DBUILD_SHARED_LIBS=ON \
cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DBUILD_SHARED_LIBS=${BUILD_SHARED_LIBS} \
  -DBUILD_DOC=${BUILD_DOC} -DSUPPORT_RDMA=OFF ..
cmake --build .
src/CMakeLists.txt
@@ -1,5 +1,3 @@
# should we use our own math functions
option(SUPPORT_RDMA "If support rdma" OFF)
@@ -16,9 +14,9 @@
./socket/shm_mod_socket.cpp
./time_util.cpp
./psem.cpp
./svsem.cpp
./bus_error.cpp
./futex_sem.cpp
./svsem.cpp
./net/net_conn_pool.cpp
./net/net_mod_server_socket_wrapper.cpp
./net/net_mod_socket_wrapper.cpp
@@ -28,7 +26,6 @@
./shm/shm_mm_wrapper.cpp
./shm/mm.cpp
./shm/hashtable.cpp
    )
@@ -62,13 +59,14 @@
./time_util.h
./futex_sem.h
./bus_error.h
./svsem.h
./bus_def.h
./logger_factory.h
./queue/linked_lock_free_queue.h
./queue/array_lock_free_queue2.h
./queue/array_lock_free_queue.h
./queue/shm_queue.h
./queue/array_lock_free_sem_queue.h
./queue/lock_free_queue.h
./svsem.h
./net/net_conn_pool.h
./net/net_mod_socket.h
./net/net_mod_server_socket_wrapper.h
@@ -82,6 +80,7 @@
./shm/shm_allocator.h
  DESTINATION include)
install(FILES "${PROJECT_BINARY_DIR}/src/bus_config.h"
src/bus_def.h
New file
@@ -0,0 +1,7 @@
#ifndef _BUS_DEF_H_
#define _BUS_DEF_H_
#define BUS_TIMEOUT_FLAG  1
#define BUS_NOWAIT_FLAG  1 << 1
#endif
src/net/net_mod_server_socket.cpp
@@ -54,7 +54,7 @@
  sprintf(portstr, "%d", port);
  listenfd = open_listenfd(portstr);
  if(listenfd < 0) {
    LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start . errno=%d ", errno);
    LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start. port = %d ", port);
    return -1;
  }
  init_pool(listenfd);
src/net/net_mod_socket.cpp
@@ -513,10 +513,12 @@
 * @return 0 成功, 其他值 失败的错误码
*/
int NetModSocket::recvfrom(void **buf, int *size, int *key) {
  logger->debug(" %d NetModSocket::recvfrom before", get_key());
  int rv = shmModSocket.recvfrom(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
    logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
@@ -533,7 +535,7 @@
  struct timespec timeout = {sec, nsec};
  int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
@@ -549,7 +551,7 @@
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
  int rv = shmModSocket.recvfrom_nowait(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
src/net/net_mod_socket_wrapper.cpp
@@ -67,8 +67,13 @@
 * @return 0 成功, 其他值 失败的错误码
*/
int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){
    int rv;
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->recvfrom(buf, size, key);
    logger->debug(" %d net_mod_socket_recvfrom before", net_mod_socket_get_key(_socket));
    rv = sockt->recvfrom(buf, size, key);
    logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv);
    return rv;
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
src/queue/array_lock_free_queue.h
@@ -1,5 +1,6 @@
#ifndef __ARRAY_LOCK_FREE_QUEUE_H__
#define __ARRAY_LOCK_FREE_QUEUE_H__
#include "atomic_ops.h"
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
@@ -17,306 +18,290 @@
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
template <typename ELEM_T, typename Allocator = SHM_Allocator>
class ArrayLockFreeQueue
{
    // ArrayLockFreeQueue will be using this' private members
    template <
        typename ELEM_T_,
        typename Allocator_,
        template <typename T, typename AT> class Q_TYPE
        >
    friend class LockFreeQueue;
template<typename ELEM_T, typename Allocator = SHM_Allocator>
class ArrayLockFreeQueue {
  // ArrayLockFreeQueue will be using this' private members
  template<
    typename ELEM_T_,
    typename Allocator_,
    template<typename T, typename AT> class Q_TYPE
  >
  friend
  class LockFreeQueue;
private:
    /// @brief constructor of the class
    ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    virtual ~ArrayLockFreeQueue();
    inline uint32_t size();
    inline bool full();
  /// @brief constructor of the class
  ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    inline bool empty();
    bool push(const ELEM_T &a_data);
    bool pop(ELEM_T &a_data);
    /// @brief calculate the index in the circular array that corresponds
    /// to a particular "count" value
    inline uint32_t countToIndex(uint32_t a_count);
  virtual ~ArrayLockFreeQueue();
    ELEM_T& operator[](unsigned i);
private:
    size_t Q_SIZE;
    /// @brief array to keep the elements
    ELEM_T *m_theQueue;
  inline uint32_t size();
    /// @brief where a new element will be inserted
    uint32_t m_writeIndex;
  inline bool full();
    /// @brief where the next element where be extracted from
    uint32_t m_readIndex;
    /// @brief maximum read index for multiple producer queues
    /// If it's not the same as m_writeIndex it means
    /// there are writes pending to be "committed" to the queue, that means,
    /// the place for the data was reserved (the index in the array) but
    /// data is still not in the queue, so the thread trying to read will have
    /// to wait for those other threads to save the data into the queue
    ///
    /// note this is only used for multiple producers
    uint32_t m_maximumReadIndex;
  inline bool empty();
  bool push(const ELEM_T &a_data);
  bool pop(ELEM_T &a_data);
  /// @brief calculate the index in the circular array that corresponds
  /// to a particular "count" value
  inline uint32_t countToIndex(uint32_t a_count);
  ELEM_T &operator[](unsigned i);
private:
  size_t Q_SIZE;
  /// @brief array to keep the elements
  ELEM_T *m_theQueue;
  /// @brief where a new element will be inserted
  uint32_t m_writeIndex;
  /// @brief where the next element where be extracted from
  uint32_t m_readIndex;
  /// @brief maximum read index for multiple producer queues
  /// If it's not the same as m_writeIndex it means
  /// there are writes pending to be "committed" to the queue, that means,
  /// the place for the data was reserved (the index in the array) but
  /// data is still not in the queue, so the thread trying to read will have
  /// to wait for those other threads to save the data into the queue
  ///
  /// note this is only used for multiple producers
  uint32_t m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    /// @brief number of elements in the queue
    uint32_t m_count;
  /// @brief number of elements in the queue
  uint32_t m_count;
#endif
private:
    /// @brief disable copy constructor declaring it private
    ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
  /// @brief disable copy constructor declaring it private
  ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
};
template <typename ELEM_T, typename Allocator>
template<typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
    Q_SIZE(qsize),
    m_writeIndex(0),      // initialisation is not atomic
    m_readIndex(0),       //
    m_maximumReadIndex(0) //
  Q_SIZE(qsize),
  m_writeIndex(0),      // initialisation is not atomic
  m_readIndex(0),       //
  m_maximumReadIndex(0) //
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    ,m_count(0)           //
  , m_count(0)           //
#endif
{
    m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
  m_theQueue = (ELEM_T *) Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
}
template <typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
{
    // std::cout << "destroy ArrayLockFreeQueue\n";
    Allocator::deallocate(m_theQueue);
template<typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() {
  // std::cout << "destroy ArrayLockFreeQueue\n";
  Allocator::deallocate(m_theQueue);
}
template <typename ELEM_T, typename Allocator>
inline
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
{
    // if Q_SIZE is a power of 2 this statement could be also written as
    // return (a_count & (Q_SIZE - 1));
    return (a_count % Q_SIZE);
template<typename ELEM_T, typename Allocator>
inline
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) {
  // if Q_SIZE is a power of 2 this statement could be also written as
  // return (a_count & (Q_SIZE - 1));
  return (a_count % Q_SIZE);
}
template <typename ELEM_T, typename Allocator>
inline
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
{
template<typename ELEM_T, typename Allocator>
inline
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return m_count;
  return m_count;
#else
    uint32_t currentWriteIndex = m_maximumReadIndex;
    uint32_t currentReadIndex  = m_readIndex;
  uint32_t currentWriteIndex = m_maximumReadIndex;
  uint32_t currentReadIndex  = m_readIndex;
    // let's think of a scenario where this function returns bogus data
    // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
    // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
    // 2. afterwards this thread is preemted. While this thread is inactive 2
    // elements are inserted and removed from the queue, so m_maximumReadIndex
    // is 5 and m_readIndex 4. Real size is still 1
    // 3. Now the current thread comes back from preemption and reads m_readIndex.
    // currentReadIndex is 4
    // 4. currentReadIndex is bigger than currentWriteIndex, so
    // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
    // it returns that the queue is almost full, when it is almost empty
    //
    if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
    {
        return (currentWriteIndex - currentReadIndex);
    }
    else
    {
        return (Q_SIZE + currentWriteIndex - currentReadIndex);
    }
  // let's think of a scenario where this function returns bogus data
  // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
  // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
  // 2. afterwards this thread is preemted. While this thread is inactive 2
  // elements are inserted and removed from the queue, so m_maximumReadIndex
  // is 5 and m_readIndex 4. Real size is still 1
  // 3. Now the current thread comes back from preemption and reads m_readIndex.
  // currentReadIndex is 4
  // 4. currentReadIndex is bigger than currentWriteIndex, so
  // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
  // it returns that the queue is almost full, when it is almost empty
  //
  if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
  {
      return (currentWriteIndex - currentReadIndex);
  }
  else
  {
      return (Q_SIZE + currentWriteIndex - currentReadIndex);
  }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T, typename Allocator>
inline
bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
{
template<typename ELEM_T, typename Allocator>
inline
bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count == (Q_SIZE));
  return (m_count == (Q_SIZE));
#else
    uint32_t currentWriteIndex = m_writeIndex;
    uint32_t currentReadIndex  = m_readIndex;
  uint32_t currentWriteIndex = m_writeIndex;
  uint32_t currentReadIndex  = m_readIndex;
  if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
  {
      // the queue is full
      return true;
  }
  else
  {
      // not full!
      return false;
  }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template<typename ELEM_T, typename Allocator>
inline
bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  return (m_count == 0);
#else
  if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
  {
      // the queue is full
      return true;
  }
  else
  {
      // not full!
      return false;
  }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template<typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) {
  uint32_t currentReadIndex;
  uint32_t currentWriteIndex;
  do {
    currentWriteIndex = m_writeIndex;
    currentReadIndex = m_readIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    if (m_count == Q_SIZE) {
      return false;
    }
#else
    if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
    {
        // the queue is full
        return true;
    }
    else
    {
        // not full!
        return false;
    }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T, typename Allocator>
inline
bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count == 0);
#else
    if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
    {
        // the queue is full
        return true;
    }
    else
    {
        // not full!
        return false;
    }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
{
    uint32_t currentReadIndex;
    uint32_t currentWriteIndex;
    do
    {
        currentWriteIndex = m_writeIndex;
        currentReadIndex  = m_readIndex;
    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count == Q_SIZE) {
            return false;
        }
    #else
        if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
        {
            // the queue is full
            return false;
        }
    #endif
    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
    // We know now that this index is reserved for us. Use it to save the data
    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
    // inserting in the queue. It might fail if there are more than 1 producer threads because this
    // operation has to be done in the same order as the previous CAS
    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
    {
        // this is a good place to yield the thread in case there are more
        // software threads than hardware processors and you have more
        // than 1 producer thread
        // have a look at sched_yield (POSIX.1b)
        sched_yield();
    }
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    AtomicAdd(&m_count, 1);
#endif
    return true;
  } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
  // We know now that this index is reserved for us. Use it to save the data
  m_theQueue[countToIndex(currentWriteIndex)] = a_data;
  // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
  // inserting in the queue. It might fail if there are more than 1 producer threads because this
  // operation has to be done in the same order as the previous CAS
  while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
    // this is a good place to yield the thread in case there are more
    // software threads than hardware processors and you have more
    // than 1 producer thread
    // have a look at sched_yield (POSIX.1b)
    sched_yield();
  }
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  AtomicAdd(&m_count, 1);
#endif
  return true;
}
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
{
    uint32_t currentMaximumReadIndex;
    uint32_t currentReadIndex;
template<typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) {
  uint32_t currentMaximumReadIndex;
  uint32_t currentReadIndex;
    do
    {
        // to ensure thread-safety when there is more than 1 producer thread
        // a second index is defined (m_maximumReadIndex)
        currentReadIndex        = m_readIndex;
        currentMaximumReadIndex = m_maximumReadIndex;
    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  do {
    // to ensure thread-safety when there is more than 1 producer thread
    // a second index is defined (m_maximumReadIndex)
    currentReadIndex = m_readIndex;
    currentMaximumReadIndex = m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count == 0) {
            return false;
        }
    #else
        if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
        {
            // the queue is empty or
            // a producer thread has allocate space in the queue but is
            // waiting to commit the data into it
            return false;
        }
    #endif
        // retrieve the data from the queue
        a_data = m_theQueue[countToIndex(currentReadIndex)];
        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we
        // increased it
        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
        {
        #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
            // m_count.fetch_sub(1);
            AtomicSub(&m_count, 1);
        #endif
            return true;
        }
        // it failed retrieving the element off the queue. Someone else must
        // have read the element stored at countToIndex(currentReadIndex)
        // before we could perform the CAS operation
    } while(1); // keep looping to try again!
    // Something went wrong. it shouldn't be possible to reach here
    assert(0);
    // Add this return statement to avoid compiler warnings
    return false;
}
template <typename ELEM_T, typename Allocator>
ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
    int currentCount = m_count;
    uint32_t currentReadIndex = m_readIndex;
    if (i >= currentCount)
    {
        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
        std::exit(EXIT_FAILURE);
    if (m_count == 0) {
      return false;
    }
    return m_theQueue[countToIndex(currentReadIndex+i)];
#else
    if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
    {
        // the queue is empty or
        // a producer thread has allocate space in the queue but is
        // waiting to commit the data into it
        return false;
    }
#endif
    // retrieve the data from the queue
    a_data = m_theQueue[countToIndex(currentReadIndex)];
    // try to perfrom now the CAS operation on the read index. If we succeed
    // a_data already contains what m_readIndex pointed to before we
    // increased it
    if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
      // m_count.fetch_sub(1);
      AtomicSub(&m_count, 1);
#endif
      return true;
    }
    // it failed retrieving the element off the queue. Someone else must
    // have read the element stored at countToIndex(currentReadIndex)
    // before we could perform the CAS operation
  } while (1); // keep looping to try again!
  // Something went wrong. it shouldn't be possible to reach here
  assert(0);
  // Add this return statement to avoid compiler warnings
  return false;
}
template<typename ELEM_T, typename Allocator>
ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) {
  int currentCount = m_count;
  uint32_t currentReadIndex = m_readIndex;
  if (i >= currentCount) {
    std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i
              << " is out of range\n";
    std::exit(EXIT_FAILURE);
  }
  return m_theQueue[countToIndex(currentReadIndex + i)];
}
#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
src/queue/array_lock_free_sem_queue.h
@@ -8,7 +8,7 @@
#include "shm_allocator.h"
#include "futex_sem.h"
#include "time_util.h"
#include "bus_def.h"
/// @brief implementation of an array based lock free queue with support for
///        multiple producers
@@ -17,8 +17,7 @@
/// you must use the ArrayLockFreeSemQueue fachade:
///   ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q;
#define LOCK_FREE_QUEUE_TIMEOUT  1
#define LOCK_FREE_QUEUE_NOWAIT  1 << 1
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -207,15 +206,17 @@
  uint32_t currentWriteIndex;
  int s;
  // sigset_t mask_all, pre;
  // sigfillset(&mask_all);
  do
  {
    currentWriteIndex = m_writeIndex;
    currentReadIndex  = m_readIndex;
  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    if (m_count == Q_SIZE) {
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
@@ -235,9 +236,9 @@
    if (currentReadIndex == currentWriteIndex - Q_SIZE  + 1   )
    {
        // the queue is full
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
@@ -260,10 +261,11 @@
  // We know now that this index is reserved for us. Use it to save the data
  m_theQueue[countToIndex(currentWriteIndex)] = a_data;
  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
  // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
  // inserting in the queue. It might fail if there are more than 1 producer threads because this
  // operation has to be done in the same order as the previous CAS
  while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
  {
    // this is a good place to yield the thread in case there are more
@@ -283,6 +285,7 @@
      err_exit(errno, "futex-FUTEX_WAKE");  
#endif
  // sigprocmask(SIG_SETMASK, &pre, NULL);
  return 0;
}
@@ -293,6 +296,11 @@
  uint32_t currentMaximumReadIndex;
  uint32_t currentReadIndex;
  int s;
  // sigset_t mask_all, pre;
  // sigfillset(&mask_all);
  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
  do
  {
@@ -305,19 +313,23 @@
    if (m_count == 0) {
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        // sigprocmask(SIG_SETMASK, &pre, NULL);
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
      }
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
        }
            
      } else {
        s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
        }
      }
@@ -330,19 +342,23 @@
      // the queue is empty or
      // a producer thread has allocate space in the queue but is
      // waiting to commit the data into it
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        // sigprocmask(SIG_SETMASK, &pre, NULL);
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
      }
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
        }
            
      } else {
        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
         // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
        }
      }
@@ -367,6 +383,7 @@
        err_exit(errno, "futex-FUTEX_WAKE");
    #endif
     
      // sigprocmask(SIG_SETMASK, &pre, NULL);
      return 0;
    }
src/queue/lock_free_queue.h
@@ -12,20 +12,20 @@
#include "shm_allocator.h"
#include "psem.h"
#include "bus_error.h"
#include "bus_def.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
// static Logger *logger = LoggerFactory::getLogger();
// define this macro if calls to "size" must return the real size of the
// queue. If it is undefined  that function will try to take a snapshot of
// define this macro if calls to "size" must return the real size of the
// queue. If it is undefined  that function will try to take a snapshot of
// the queue, but returned value might be bogus
// forward declarations for default template values
//
template <typename ELEM_T, typename Allocator>
template<typename ELEM_T, typename Allocator>
class ArrayLockFreeQueue;
// template <typename ELEM_T>
@@ -33,9 +33,9 @@
/// @brief Lock-free queue based on a circular array
/// No allocation of extra memory for the nodes handling is needed, but it has
/// to add extra overhead (extra CAS operation) when inserting to ensure the
/// thread-safety of the queue when the queue type is not
/// No allocation of extra memory for the nodes handling is needed, but it has
/// to add extra overhead (extra CAS operation) when inserting to ensure the
/// thread-safety of the queue when the queue type is not
/// ArrayLockFreeQueueSingleProducer.
///
/// examples of instantiation:
@@ -50,113 +50,109 @@
///
/// ELEM_T represents the type of elementes pushed and popped from the queue
/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
///        This number should be a power of 2 to ensure
///        indexes in the circular queue keep stable when the uint32_t
///        This number should be a power of 2 to ensure
///        indexes in the circular queue keep stable when the uint32_t
///        variable that holds the current position rolls over from FFFFFFFF
///        to 0. For instance
///        2    -> 0x02
///        2    -> 0x02
///        4    -> 0x04
///        8    -> 0x08
///        16   -> 0x10
///        (...)
///        (...)
///        1024 -> 0x400
///        2048 -> 0x800
///
///        if queue size is not defined as requested, let's say, for
///        instance 100, when current position is FFFFFFFF (4,294,967,295)
///        index in the circular array is 4,294,967,295 % 100 = 95.
///        When that value is incremented it will be set to 0, that is the
///        index in the circular array is 4,294,967,295 % 100 = 95.
///        When that value is incremented it will be set to 0, that is the
///        last 4 elements of the queue are not used when the counter rolls
///        over to 0
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
///        ArrayLockFreeQueue are supported (single producer
///        by default)
template <
    typename ELEM_T,
    typename Allocator = SHM_Allocator,
    template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
     >
class LockFreeQueue
{
template<
        typename ELEM_T,
        typename Allocator = SHM_Allocator,
        template<typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
>
class LockFreeQueue {
private:
    sem_t slots;
    sem_t items;
  sem_t slots;
  sem_t items;
public:
    sem_t mutex;
    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    /// @brief destructor of the class.
    /// Note it is not virtual since it is not expected to inherit from this
    /// template
    ~LockFreeQueue();
    std::atomic_uint reference;
    /// @brief constructor of the class
  sem_t mutex;
    /// @brief returns the current number of items in the queue
    /// It tries to take a snapshot of the size of the queue, but in busy environments
    /// this function might return bogus values.
    ///
    /// If a reliable queue size must be kept you might want to have a look at
    /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
    /// it enables a reliable size though it hits overall performance of the queue
    /// (when the reliable size variable is on it's got an impact of about 20% in time)
    inline uint32_t size();
    /// @brief return true if the queue is full. False otherwise
    /// It tries to take a snapshot of the size of the queue, but in busy
    /// environments this function might return bogus values. See help in method
    /// LockFreeQueue::size
    inline bool full();
  LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    inline bool empty();
  /// @brief destructor of the class.
  /// Note it is not virtual since it is not expected to inherit from this
  /// template
  ~LockFreeQueue();
    inline ELEM_T& operator[](unsigned i);
    /// @brief push an element at the tail of the queue
    /// @param the element to insert in the queue
    /// Note that the element is not a pointer or a reference, so if you are using large data
    /// structures to be inserted in the queue you should think of instantiate the template
    /// of the queue as a pointer to that large structure
    /// @return true if the element was inserted in the queue. False if the queue was full
    int push(const ELEM_T &a_data);
    int push_nowait(const ELEM_T &a_data);
    int push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
    /// @brief pop the element at the head of the queue
    /// @param a reference where the element in the head of the queue will be saved to
    /// Note that the a_data parameter might contain rubbish if the function returns false
    /// @return true if the element was successfully extracted from the queue. False if the queue was empty
    int pop(ELEM_T &a_data);
    int pop_nowait(ELEM_T &a_data);
    int pop_timeout(ELEM_T &a_data, struct timespec * timeout);
  std::atomic_uint reference;
  /// @brief constructor of the class
    void *operator new(size_t size);
    void operator delete(void *p);
  /// @brief returns the current number of items in the queue
  /// It tries to take a snapshot of the size of the queue, but in busy environments
  /// this function might return bogus values.
  ///
  /// If a reliable queue size must be kept you might want to have a look at
  /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
  /// it enables a reliable size though it hits overall performance of the queue
  /// (when the reliable size variable is on it's got an impact of about 20% in time)
  inline uint32_t size();
  /// @brief return true if the queue is full. False otherwise
  /// It tries to take a snapshot of the size of the queue, but in busy
  /// environments this function might return bogus values. See help in method
  /// LockFreeQueue::size
  inline bool full();
  inline bool empty();
  inline ELEM_T &operator[](unsigned i);
  /// @brief push an element at the tail of the queue
  /// @param the element to insert in the queue
  /// Note that the element is not a pointer or a reference, so if you are using large data
  /// structures to be inserted in the queue you should think of instantiate the template
  /// of the queue as a pointer to that large structure
  /// @return true if the element was inserted in the queue. False if the queue was full
  int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
  /// @brief pop the element at the head of the queue
  /// @param a reference where the element in the head of the queue will be saved to
  /// Note that the a_data parameter might contain rubbish if the function returns false
  /// @return true if the element was successfully extracted from the queue. False if the queue was empty
  int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
  void *operator new(size_t size);
  void operator delete(void *p);
protected:
    /// @brief the actual queue. methods are forwarded into the real
    ///        implementation
    Q_TYPE<ELEM_T, Allocator> m_qImpl;
  /// @brief the actual queue. methods are forwarded into the real
  ///        implementation
  Q_TYPE<ELEM_T, Allocator> m_qImpl;
private:
    /// @brief disable copy constructor declaring it private
    LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
  /// @brief disable copy constructor declaring it private
  LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
};
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
{
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
template<
        typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) {
    // std::cout << "LockFreeQueue init reference=" << reference << std::endl;
    if (sem_init(&slots, 1, qsize) == -1)
        err_exit(errno, "LockFreeQueue sem_init");
    if (sem_init(&items, 1, 0) == -1)
@@ -164,194 +160,130 @@
    if (sem_init(&mutex, 1, 1) == -1)
        err_exit(errno, "LockFreeQueue sem_init");
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
{
template<
        typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
    // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
    if(sem_destroy(&slots) == -1) {
    if (sem_destroy(&slots) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
    }
    if(sem_destroy(&items) == -1) {
    if (sem_destroy(&items) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
    }
    if(sem_destroy(&mutex) == -1) {
    if (sem_destroy(&mutex) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
    }
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
{
template<
        typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() {
    return m_qImpl.size();
}
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
{
template<
        typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() {
    return m_qImpl.full();
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
{
template<
        typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() {
    return m_qImpl.empty();
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
 LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
    if (psem_wait(&slots) == -1) {
        return -1;
    }
    if ( m_qImpl.push(a_data) ) {
        psem_post(&items);
LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (psem_trywait(&slots) == -1) {
        return -1;
    }
    if ( m_qImpl.push(a_data)) {
        psem_post(&items);
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
{
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");
    if ( psem_timedwait(&slots, ts) == -1) {
        return -1;
    }
    if (m_qImpl.push(a_data)){
        psem_post(&items);
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
  LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if (psem_wait(&items) == -1) {
        return -1;
template<typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
    LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
    if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        if (psem_trywait(&slots) == -1) {
            return -1;
        }
    } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        if (psem_timedwait(&slots, timeout) == -1) {
            return -1;
        }
    } else {
        if (psem_wait(&slots) == -1) {
            return -1;
        }
    }
    if (m_qImpl.push(a_data)) {
        psem_post(&items);
        LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
        return 0;
    }
    return -1;
}
template<typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
    LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        if (psem_trywait(&items) == -1) {
            return -1;
        }
    } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        if (psem_timedwait(&items, timeout) == -1) {
            return -1;
        }
    } else {
        if (psem_wait(&items) == -1) {
            return -1;
        }
    }
    if (m_qImpl.pop(a_data)) {
        psem_post(&slots);
 LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
        LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (psem_trywait(&items) == -1) {
        return -1;
    }
    if (m_qImpl.pop(a_data)) {
        psem_post(&slots);
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
    if (psem_timedwait(&items, ts) == -1) {
       return -1;
    }
    if (m_qImpl.pop(a_data)) {
        psem_post(&slots);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
template<typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
ELEM_T &LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
    return m_qImpl.operator[](i);
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
        return Allocator::allocate(size);
template<typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
void *LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size) {
    return Allocator::allocate(size);
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
template<typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
    return Allocator::deallocate(p);
}
src/queue/shm_queue.h
@@ -12,6 +12,7 @@
#include "shm_allocator.h"
#include "usg_common.h"
#include "array_lock_free_sem_queue.h"
#include "lock_free_queue.h"
#include "bus_error.h"
template <typename ELEM_T> class SHMQueue {
@@ -51,7 +52,7 @@
  /// @brief the actual queue-> methods are forwarded into the real
  ///        implementation
  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *queue;
  LockFreeQueue<ELEM_T, SHM_Allocator> *queue;
private:
  /// @brief disable copy constructor declaring it private
@@ -64,7 +65,7 @@
  hashtable_t *hashtable = mm_get_hashtable();
  std::set<int> *keyset = hashtable_keyset(hashtable);
  std::set<int>::iterator keyItr;
  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
  bool found;
  size_t count = 0;
  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
@@ -77,7 +78,7 @@
    }
    if (!found) {
      // 销毁共享内存的queue
      mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
      mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
      delete mqueue;
      hashtable_remove(hashtable, *keyItr);
      count++;
@@ -91,11 +92,11 @@
template <typename ELEM_T>
size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
  size_t count = 0;
  for(int i = 0; i< length; i++) {
    // 销毁共享内存的queue
    mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
    mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
    delete mqueue;
    hashtable_remove(hashtable, keys[i]);
    count++;
@@ -113,9 +114,9 @@
SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) {
  hashtable_t *hashtable = mm_get_hashtable();
  queue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
  queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
  if (queue == NULL || (void *)queue == (void *)1) {
    queue = new ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator>(qsize);
    queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
    hashtable_put(hashtable, key, (void *)queue);
  }
  // queue->reference++;
@@ -155,7 +156,7 @@
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
  int rv =  queue->push(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
  int rv =  queue->push(a_data, NULL, BUS_NOWAIT_FLAG);
  if(rv == -1) {
    if (errno == EAGAIN)
      return EAGAIN;
@@ -170,7 +171,7 @@
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) {
  int rv = queue->push(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
  int rv = queue->push(a_data, timeout, BUS_TIMEOUT_FLAG);
  if(rv == -1) {
    if(errno == ETIMEDOUT)
        return EBUS_TIMEOUT;
@@ -195,7 +196,7 @@
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
  int rv = queue->pop(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
  int rv = queue->pop(a_data, NULL, BUS_NOWAIT_FLAG);
  if(rv == -1) {
    if (errno == EAGAIN)
@@ -213,7 +214,7 @@
inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) {
  int rv;
  rv = queue->pop(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
  rv = queue->pop(a_data, timeout, BUS_TIMEOUT_FLAG);
  if(rv == -1) {
    if (errno == ETIMEDOUT) {
      return EBUS_TIMEOUT;
src/socket/shm_mod_socket.cpp
@@ -51,7 +51,7 @@
}
// 发送信息立刻返回。
int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
    return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
    return shm_sendto(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG);
}
 
@@ -74,7 +74,7 @@
}
int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
    int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
    int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG);
    // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
  return rv;
}
@@ -92,7 +92,7 @@
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
}
int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG);
}
int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
@@ -103,7 +103,7 @@
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
}
int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG);
}
@@ -123,7 +123,7 @@
    return _sub_(topic, size, key, timeout, 0);
}
int  ShmModSocket::sub_nowait(char *topic, int size, int key) {
    return _sub_(topic, size, key, NULL,  (int)SHM_MSG_NOWAIT);
    return _sub_(topic, size, key, NULL,  (int)BUS_NOWAIT_FLAG);
}
@@ -142,7 +142,7 @@
    return _desub_(topic, size, key, timeout, 0);
}
int  ShmModSocket::desub_nowait(char *topic, int size, int key) {
    return _desub_(topic, size, key, NULL,  (int)SHM_MSG_NOWAIT);
    return _desub_(topic, size, key, NULL,  (int)BUS_NOWAIT_FLAG);
}
@@ -161,7 +161,7 @@
    return _pub_( topic, topic_size, content, content_size, key, timeout, 0);
}
int  ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
    return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT);
    return _pub_(topic, topic_size, content, content_size, key, NULL, (int)BUS_NOWAIT_FLAG);
}
src/socket/shm_socket.cpp
@@ -373,9 +373,9 @@
  memcpy(dest.buf, buf, size);
 
  if(flags & SHM_MSG_NOWAIT != 0) {
  if( (flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
    rv = remoteQueue->push_nowait(dest);
  } else if(timeout != NULL) {
  } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
      rv = remoteQueue->push_timeout(dest, timeout);
  } else {
      rv = remoteQueue->push(dest);
@@ -393,8 +393,6 @@
      logger->error(rv, "sendto key %d failed", key);
    }
    return rv;
  }
}
@@ -433,9 +431,9 @@
  shm_msg_t src;
 
   if(flags & SHM_MSG_NOWAIT != 0) {
   if((flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
    rv = socket->queue->pop_nowait(src);
  } else if(timeout != NULL) {
  } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
    rv = socket->queue->pop_timeout(src, timeout);
// printf("0 shm_recvfrom====%d\n", rv);
  } else {
@@ -649,7 +647,7 @@
    switch (src.type) {
    case SHM_SOCKET_OPEN:
      socket->acceptQueue->push_timeout(src, &timeout);
      socket->acceptQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
      break;
    case SHM_SOCKET_CLOSE:
      _server_close_conn_to_client(socket, src.key);
@@ -660,7 +658,7 @@
      if (iter != socket->clientSocketMap->end()) {
        client_socket = iter->second;
        // print_msg("_server_run_msg_rev push before", src);
        client_socket->messageQueue->push_timeout(src, &timeout);
        client_socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
        // print_msg("_server_run_msg_rev push after", src);
      }
@@ -695,7 +693,7 @@
      _client_close_conn_to_server(socket);
      break;
    case SHM_COMMON_MSG:
      socket->messageQueue->push_timeout(src, &timeout);
      socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
      break;
    default:
       logger->error( "shm_socket._client_run_msg_rev: undefined message type.");
src/socket/shm_socket.h
@@ -6,11 +6,7 @@
#include "shm_queue.h"
#include "lock_free_queue.h"
enum shm_socket_flag_t
{
  SHM_MSG_TIMEOUT = 1,
  SHM_MSG_NOWAIT = 2
};
enum shm_connection_status_t {
    SHM_CONN_CLOSED=1,
@@ -88,7 +84,7 @@
int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
/**
 * @flags : SHM_MSG_NOWAIT
 * @flags : BUS_NOWAIT_FLAG
 */
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0);
test_net_socket/net_mod_socket.sh
@@ -36,7 +36,7 @@
}
function close() {
    ps -ef | grep -e "test_net_mod_socket" -e "net_mod_socket"| awk  '{print $2}' | xargs -i kill -9 {}
    ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk  '{print $2}' | xargs -i kill -9 {}
    ipcrm -a
}
@@ -48,8 +48,6 @@
case ${1} in
  "server")
    close
    sleep 2
  server
  ;;
  "client")
test_net_socket/test_net_mod_socket.cpp
@@ -76,10 +76,13 @@
  void *recvbuf;
  int size;
  int key;
  while (net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) == 0) {
  int rv;
  while ((rv = net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) ) == 0) {
    printf("收到订阅消息:%s\n", recvbuf);
    free(recvbuf);
  }
  printf("print_sub_msg return . rv = %d\n", rv);
  
}
test_socket/CMakeLists.txt
New file
@@ -0,0 +1,11 @@
# add the executable
add_executable(bus_test bus_test.cpp)
target_link_libraries(bus_test PRIVATE shm_queue  ${EXTRA_LIBS} )
target_include_directories(bus_test PRIVATE
                            "${PROJECT_BINARY_DIR}"
                             ${EXTRA_INCLUDES}
                            )
test_socket/bus_test.cpp
New file
@@ -0,0 +1,116 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
#include "shm_mm_wrapper.h"
#include "usg_common.h"
#include "mm.h"
BusServerSocket * server_socket;
void sigint_handler(int sig) {
   exit(0);
}
void server(int key) {
  server_socket = new BusServerSocket();
  server_socket->bind( key);
  server_socket->start();
}
void *run_recv(void *skptr) {
  pthread_detach(pthread_self());
  void *recvbuf;
  int size;
  int key;
  ShmModSocket *sk = (ShmModSocket *)skptr;
  while (sk->recvfrom( &recvbuf, &size, &key) == 0) {
    printf("收到订阅消息:%s\n", recvbuf);
    free(recvbuf);
  }
}
void client(int key) {
  ShmModSocket *sk = new ShmModSocket();
  pthread_t tid;
  pthread_create(&tid, NULL, run_recv, (void *)socket);
  int size;
  char action[512];
  char topic[512];
  char content[512];
  long i = 0;
  while (true) {
    //printf("Usage: pub <topic> [content] or sub <topic>\n");
    printf("Can I help you? sub, pub, desub or quit\n");
    scanf("%s",action);
    if(strcmp(action, "sub") == 0) {
      printf("Please input topic!\n");
      scanf("%s", topic);
      if (sk->sub(topic, strlen(topic),  key) == 0) {
         printf("%d Sub success!\n", sk->get_key());
      } else {
        printf("Sub failture!\n");
        exit(0);
      }
    } else if(strcmp(action, "desub") == 0) {
      printf("Please input topic!\n");
      scanf("%s", topic);
      if (sk->desub(topic, strlen(topic),  key) == 0) {
         printf("%d Desub success!\n", sk->get_key());
      } else {
        printf("Desub failture!\n");
        exit(0);
      }
    } else if(strcmp(action, "pub") == 0) {
      // printf("%s %s %s\n", action, topic, content);
      printf("Please input topic and content\n");
      scanf("%s %s", topic, content);
      if(sk->pub(topic, strlen(topic)+1, content, strlen(content)+1,  key) == 0){
        printf("%d Pub success!\n", sk->get_key());
      } else {
        printf("Pub failture!\n");
      }
    } else if(strcmp(action, "quit") == 0) {
      printf("(%d) quit\n", sk->get_key());
      delete sk;
      break;
    } else {
      printf("error input argument\n");
      continue;
    }
  }
}
int main(int argc, char *argv[]) {
  shm_mm_wrapper_init(512);
  int key;
  if (argc < 3) {
    fprintf(stderr, "Usage: %s %s|%s  <key> ...\n", argv[0], "server", "client");
    return 1;
  }
  key = atoi(argv[2]);
  if (strcmp("server", argv[1]) == 0) {
   server(key);
  } else if (strcmp("client", argv[1]) == 0) {
    client(key);
  }
  return 0;
}
test_socket/dgram_mod_bus.cpp
File was deleted