From 4c73fd7179e92bee9cccb65e46823b00f568acb3 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 22 一月 2021 16:57:34 +0800
Subject: [PATCH] tmp

---
 src/queue/lock_free_queue.h             |  404 +++++++----------
 test_net_socket/test_net_mod_socket.cpp |    5 
 src/socket/shm_socket.h                 |    8 
 build.sh                                |    4 
 CMakeLists.txt                          |    6 
 src/net/net_mod_server_socket.cpp       |    2 
 src/net/net_mod_socket.cpp              |    8 
 src/socket/shm_socket.cpp               |   16 
 src/bus_def.h                           |    7 
 test_socket/CMakeLists.txt              |   11 
 /dev/null                               |  131 -----
 src/queue/array_lock_free_queue.h       |  487 ++++++++++-----------
 src/queue/shm_queue.h                   |   23 
 test_socket/bus_test.cpp                |  116 +++++
 src/net/net_mod_socket_wrapper.cpp      |    7 
 src/queue/array_lock_free_sem_queue.h   |   41 +
 test_net_socket/net_mod_socket.sh       |    4 
 src/CMakeLists.txt                      |   11 
 src/socket/shm_mod_socket.cpp           |   14 
 19 files changed, 625 insertions(+), 680 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 8310bfa..3b1e4b4 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -13,7 +13,10 @@
 set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}")
 # set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}")
 
-option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
+if (CMAKE_BUILD_TYPE EQUAL "Release")
+#option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
+endif()
+
 option(BUILD_DOC "Build doc" OFF)
 
 
@@ -29,4 +32,5 @@
 	add_subdirectory(${PROJECT_SOURCE_DIR}/src)
 	add_subdirectory(${PROJECT_SOURCE_DIR}/test)
 	add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket)
+	add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
 endif()
diff --git a/build.sh b/build.sh
index eab77c7..6bfd753 100755
--- a/build.sh
+++ b/build.sh
@@ -2,6 +2,7 @@
 
 BUILD_TYPE="Debug"
 BUILD_DOC="OFF"
+BUILD_SHARED_LIBS="OFF"
 
 function usage() {
 	echo "build.sh [release | debug | doc]"
@@ -10,6 +11,7 @@
 case ${1} in
   "release")
   BUILD_TYPE="Release"
+  BUILD_SHARED_LIBS="ON"
   ;;
   
   "debug")
@@ -48,7 +50,7 @@
 # -DBUILD_SHARED_LIBS=ON
 # -DCMAKE_INSTALL_PREFIX=$(pwd/../dest)
 # -DQCA_MAN_INSTALL_DIR:PATH=/usr/share/man 
-cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DBUILD_SHARED_LIBS=ON \
+cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DBUILD_SHARED_LIBS=${BUILD_SHARED_LIBS} \
   -DBUILD_DOC=${BUILD_DOC} -DSUPPORT_RDMA=OFF ..
 
 cmake --build .
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index fe597d9..f3698d9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -1,5 +1,3 @@
-
-
 # should we use our own math functions
 option(SUPPORT_RDMA "If support rdma" OFF)
 
@@ -16,9 +14,9 @@
 ./socket/shm_mod_socket.cpp
 ./time_util.cpp
 ./psem.cpp
-./svsem.cpp
 ./bus_error.cpp
 ./futex_sem.cpp
+./svsem.cpp
 ./net/net_conn_pool.cpp
 ./net/net_mod_server_socket_wrapper.cpp
 ./net/net_mod_socket_wrapper.cpp
@@ -28,7 +26,6 @@
 ./shm/shm_mm_wrapper.cpp
 ./shm/mm.cpp
 ./shm/hashtable.cpp
-
 
 	)
 
@@ -62,13 +59,14 @@
 ./time_util.h
 ./futex_sem.h
 ./bus_error.h
-./svsem.h
+./bus_def.h
 ./logger_factory.h
 ./queue/linked_lock_free_queue.h
-./queue/array_lock_free_queue2.h
 ./queue/array_lock_free_queue.h
 ./queue/shm_queue.h
+./queue/array_lock_free_sem_queue.h
 ./queue/lock_free_queue.h
+./svsem.h
 ./net/net_conn_pool.h
 ./net/net_mod_socket.h
 ./net/net_mod_server_socket_wrapper.h
@@ -82,6 +80,7 @@
 ./shm/shm_allocator.h
 
 
+
   DESTINATION include)
 
 install(FILES "${PROJECT_BINARY_DIR}/src/bus_config.h"
diff --git a/src/bus_def.h b/src/bus_def.h
new file mode 100644
index 0000000..78a7eb9
--- /dev/null
+++ b/src/bus_def.h
@@ -0,0 +1,7 @@
+#ifndef _BUS_DEF_H_
+#define _BUS_DEF_H_
+
+#define BUS_TIMEOUT_FLAG  1
+#define BUS_NOWAIT_FLAG  1 << 1
+
+#endif
\ No newline at end of file
diff --git a/src/net/net_mod_server_socket.cpp b/src/net/net_mod_server_socket.cpp
index 8324d6f..432defb 100644
--- a/src/net/net_mod_server_socket.cpp
+++ b/src/net/net_mod_server_socket.cpp
@@ -54,7 +54,7 @@
   sprintf(portstr, "%d", port);
   listenfd = open_listenfd(portstr);
   if(listenfd < 0) {
-    LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start . errno=%d ", errno);
+    LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start. port = %d ", port);
     return -1;
   }
   init_pool(listenfd);
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index 73be131..3ef15b9 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -513,10 +513,12 @@
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
 int NetModSocket::recvfrom(void **buf, int *size, int *key) {
+
+  logger->debug(" %d NetModSocket::recvfrom before", get_key());
   int rv = shmModSocket.recvfrom(buf, size, key);
 
   if(rv == 0) {
-    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
+    logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key);
     return 0;
   }
 
@@ -533,7 +535,7 @@
   struct timespec timeout = {sec, nsec};
   int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
   if(rv == 0) {
-    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
+    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom %d success.\n", get_key(), *key);
     return 0;
   }
 
@@ -549,7 +551,7 @@
 int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
   int rv = shmModSocket.recvfrom_nowait(buf, size, key);
   if(rv == 0) {
-    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
+    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom %d success.\n", get_key(), *key);
     return 0;
   }
 
diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp
index 83161bc..e7bf302 100644
--- a/src/net/net_mod_socket_wrapper.cpp
+++ b/src/net/net_mod_socket_wrapper.cpp
@@ -67,8 +67,13 @@
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
 int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){
+	int rv;
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	return sockt->recvfrom(buf, size, key);
+
+	logger->debug(" %d net_mod_socket_recvfrom before", net_mod_socket_get_key(_socket));
+	rv = sockt->recvfrom(buf, size, key);
+	logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv);
+	return rv;
 }
 // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
 int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
diff --git a/src/queue/array_lock_free_queue.h b/src/queue/array_lock_free_queue.h
index ae1506d..a03b33e 100644
--- a/src/queue/array_lock_free_queue.h
+++ b/src/queue/array_lock_free_queue.h
@@ -1,5 +1,6 @@
 #ifndef __ARRAY_LOCK_FREE_QUEUE_H__
 #define __ARRAY_LOCK_FREE_QUEUE_H__
+
 #include "atomic_ops.h"
 #include <assert.h> // assert()
 #include <sched.h>  // sched_yield()
@@ -17,306 +18,290 @@
 
 #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-template <typename ELEM_T, typename Allocator = SHM_Allocator>
-class ArrayLockFreeQueue
-{
-    // ArrayLockFreeQueue will be using this' private members
-    template <
-        typename ELEM_T_, 
-        typename Allocator_,
-        template <typename T, typename AT> class Q_TYPE
-        >
-    friend class LockFreeQueue;
+template<typename ELEM_T, typename Allocator = SHM_Allocator>
+class ArrayLockFreeQueue {
+  // ArrayLockFreeQueue will be using this' private members
+  template<
+    typename ELEM_T_,
+    typename Allocator_,
+    template<typename T, typename AT> class Q_TYPE
+  >
+  friend
+  class LockFreeQueue;
 
 private:
-    /// @brief constructor of the class
-    ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
-    
-    virtual ~ArrayLockFreeQueue();
-    
-    inline uint32_t size();
-    
-    inline bool full();
+  /// @brief constructor of the class
+  ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
 
-    inline bool empty();
-    
-    bool push(const ELEM_T &a_data);   
-    
-    bool pop(ELEM_T &a_data);
-    
-    /// @brief calculate the index in the circular array that corresponds
-    /// to a particular "count" value
-    inline uint32_t countToIndex(uint32_t a_count);
+  virtual ~ArrayLockFreeQueue();
 
-    ELEM_T& operator[](unsigned i);
-    
-private:    
-    size_t Q_SIZE;
-    /// @brief array to keep the elements
-    ELEM_T *m_theQueue;
+  inline uint32_t size();
 
-    /// @brief where a new element will be inserted
-    uint32_t m_writeIndex;
+  inline bool full();
 
-    /// @brief where the next element where be extracted from
-    uint32_t m_readIndex;
-    
-    /// @brief maximum read index for multiple producer queues
-    /// If it's not the same as m_writeIndex it means
-    /// there are writes pending to be "committed" to the queue, that means,
-    /// the place for the data was reserved (the index in the array) but  
-    /// data is still not in the queue, so the thread trying to read will have 
-    /// to wait for those other threads to save the data into the queue
-    ///
-    /// note this is only used for multiple producers
-    uint32_t m_maximumReadIndex;
+  inline bool empty();
+
+  bool push(const ELEM_T &a_data);
+
+  bool pop(ELEM_T &a_data);
+
+  /// @brief calculate the index in the circular array that corresponds
+  /// to a particular "count" value
+  inline uint32_t countToIndex(uint32_t a_count);
+
+  ELEM_T &operator[](unsigned i);
+
+private:
+  size_t Q_SIZE;
+  /// @brief array to keep the elements
+  ELEM_T *m_theQueue;
+
+  /// @brief where a new element will be inserted
+  uint32_t m_writeIndex;
+
+  /// @brief where the next element where be extracted from
+  uint32_t m_readIndex;
+
+  /// @brief maximum read index for multiple producer queues
+  /// If it's not the same as m_writeIndex it means
+  /// there are writes pending to be "committed" to the queue, that means,
+  /// the place for the data was reserved (the index in the array) but
+  /// data is still not in the queue, so the thread trying to read will have
+  /// to wait for those other threads to save the data into the queue
+  ///
+  /// note this is only used for multiple producers
+  uint32_t m_maximumReadIndex;
 
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-    /// @brief number of elements in the queue
-    uint32_t m_count;
+  /// @brief number of elements in the queue
+  uint32_t m_count;
 #endif
-   
-    
+
+
 private:
-    /// @brief disable copy constructor declaring it private
-    ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
+  /// @brief disable copy constructor declaring it private
+  ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
 
 };
 
 
-template <typename ELEM_T, typename Allocator>
+template<typename ELEM_T, typename Allocator>
 ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
-    Q_SIZE(qsize),
-    m_writeIndex(0),      // initialisation is not atomic
-    m_readIndex(0),       //
-    m_maximumReadIndex(0) //
+  Q_SIZE(qsize),
+  m_writeIndex(0),      // initialisation is not atomic
+  m_readIndex(0),       //
+  m_maximumReadIndex(0) //
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-    ,m_count(0)           //
+  , m_count(0)           //
 #endif
 {
-    m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
+  m_theQueue = (ELEM_T *) Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
 
 }
 
-template <typename ELEM_T, typename Allocator>
-ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
-{
-    // std::cout << "destroy ArrayLockFreeQueue\n";
-    Allocator::deallocate(m_theQueue);
-    
+template<typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() {
+  // std::cout << "destroy ArrayLockFreeQueue\n";
+  Allocator::deallocate(m_theQueue);
+
 }
 
-template <typename ELEM_T, typename Allocator>
-inline 
-uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
-{
-    // if Q_SIZE is a power of 2 this statement could be also written as 
-    // return (a_count & (Q_SIZE - 1));
-    return (a_count % Q_SIZE);
+template<typename ELEM_T, typename Allocator>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) {
+  // if Q_SIZE is a power of 2 this statement could be also written as
+  // return (a_count & (Q_SIZE - 1));
+  return (a_count % Q_SIZE);
 }
 
-template <typename ELEM_T, typename Allocator>
-inline 
-uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
-{
+template<typename ELEM_T, typename Allocator>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() {
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-    return m_count;
+  return m_count;
 #else
 
-    uint32_t currentWriteIndex = m_maximumReadIndex;
-    uint32_t currentReadIndex  = m_readIndex;
+  uint32_t currentWriteIndex = m_maximumReadIndex;
+  uint32_t currentReadIndex  = m_readIndex;
 
-    // let's think of a scenario where this function returns bogus data
-    // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
-    // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
-    // 2. afterwards this thread is preemted. While this thread is inactive 2 
-    // elements are inserted and removed from the queue, so m_maximumReadIndex 
-    // is 5 and m_readIndex 4. Real size is still 1
-    // 3. Now the current thread comes back from preemption and reads m_readIndex.
-    // currentReadIndex is 4
-    // 4. currentReadIndex is bigger than currentWriteIndex, so
-    // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
-    // it returns that the queue is almost full, when it is almost empty
-    //
-    if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
-    {
-        return (currentWriteIndex - currentReadIndex);
-    }
-    else
-    {
-        return (Q_SIZE + currentWriteIndex - currentReadIndex);
-    }
+  // let's think of a scenario where this function returns bogus data
+  // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
+  // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
+  // 2. afterwards this thread is preemted. While this thread is inactive 2
+  // elements are inserted and removed from the queue, so m_maximumReadIndex
+  // is 5 and m_readIndex 4. Real size is still 1
+  // 3. Now the current thread comes back from preemption and reads m_readIndex.
+  // currentReadIndex is 4
+  // 4. currentReadIndex is bigger than currentWriteIndex, so
+  // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+  // it returns that the queue is almost full, when it is almost empty
+  //
+  if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
+  {
+      return (currentWriteIndex - currentReadIndex);
+  }
+  else
+  {
+      return (Q_SIZE + currentWriteIndex - currentReadIndex);
+  }
 #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 }
 
-template <typename ELEM_T, typename Allocator>
-inline 
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
-{
+template<typename ELEM_T, typename Allocator>
+inline
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() {
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-    return (m_count == (Q_SIZE));
+  return (m_count == (Q_SIZE));
 #else
 
-    uint32_t currentWriteIndex = m_writeIndex;
-    uint32_t currentReadIndex  = m_readIndex;
-    
+  uint32_t currentWriteIndex = m_writeIndex;
+  uint32_t currentReadIndex  = m_readIndex;
+
+  if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+  {
+      // the queue is full
+      return true;
+  }
+  else
+  {
+      // not full!
+      return false;
+  }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template<typename ELEM_T, typename Allocator>
+inline
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() {
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+  return (m_count == 0);
+#else
+
+  if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
+  {
+      // the queue is full
+      return true;
+  }
+  else
+  {
+      // not full!
+      return false;
+  }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+
+template<typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) {
+  uint32_t currentReadIndex;
+  uint32_t currentWriteIndex;
+
+  do {
+
+    currentWriteIndex = m_writeIndex;
+    currentReadIndex = m_readIndex;
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+    if (m_count == Q_SIZE) {
+      return false;
+    }
+#else
     if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
     {
         // the queue is full
-        return true;
-    }
-    else
-    {
-        // not full!
         return false;
     }
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-template <typename ELEM_T, typename Allocator>
-inline 
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
-{
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
-    return (m_count == 0);
-#else
-
-    if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
-    {
-        // the queue is full
-        return true;
-    }
-    else
-    {
-        // not full!
-        return false;
-    }
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-
-
-
-
-
-template <typename ELEM_T, typename Allocator>
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
-{
-    uint32_t currentReadIndex;
-    uint32_t currentWriteIndex;
-
-    do
-    {
-
-        currentWriteIndex = m_writeIndex;
-        currentReadIndex  = m_readIndex;
-    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
-        if (m_count == Q_SIZE) {
-            return false;
-        }
-    #else
-        if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
-        {
-            // the queue is full
-            return false;
-        }
-    #endif
-
-    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
-
-    // We know now that this index is reserved for us. Use it to save the data
-    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
-
-    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
-    // inserting in the queue. It might fail if there are more than 1 producer threads because this
-    // operation has to be done in the same order as the previous CAS
-
-    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
-    {
-        // this is a good place to yield the thread in case there are more
-        // software threads than hardware processors and you have more
-        // than 1 producer thread
-        // have a look at sched_yield (POSIX.1b)
-        sched_yield();
-    }
-
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-    AtomicAdd(&m_count, 1);
 #endif
-    return true;
+
+  } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+  // We know now that this index is reserved for us. Use it to save the data
+  m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+
+  // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
+  // inserting in the queue. It might fail if there are more than 1 producer threads because this
+  // operation has to be done in the same order as the previous CAS
+
+  while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
+    // this is a good place to yield the thread in case there are more
+    // software threads than hardware processors and you have more
+    // than 1 producer thread
+    // have a look at sched_yield (POSIX.1b)
+    sched_yield();
+  }
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+  AtomicAdd(&m_count, 1);
+#endif
+  return true;
 }
 
 
-template <typename ELEM_T, typename Allocator>
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
-{
-    uint32_t currentMaximumReadIndex;
-    uint32_t currentReadIndex;
+template<typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) {
+  uint32_t currentMaximumReadIndex;
+  uint32_t currentReadIndex;
 
-    do
-    {
-        // to ensure thread-safety when there is more than 1 producer thread
-        // a second index is defined (m_maximumReadIndex)
-        currentReadIndex        = m_readIndex;
-        currentMaximumReadIndex = m_maximumReadIndex;
-    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+  do {
+    // to ensure thread-safety when there is more than 1 producer thread
+    // a second index is defined (m_maximumReadIndex)
+    currentReadIndex = m_readIndex;
+    currentMaximumReadIndex = m_maximumReadIndex;
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-        if (m_count == 0) {
-            return false;
-        }
-    #else
-        if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
-        {
-            // the queue is empty or
-            // a producer thread has allocate space in the queue but is
-            // waiting to commit the data into it
-            return false;
-        }
-    #endif
-
-        // retrieve the data from the queue
-        a_data = m_theQueue[countToIndex(currentReadIndex)];
-
-        // try to perfrom now the CAS operation on the read index. If we succeed
-        // a_data already contains what m_readIndex pointed to before we
-        // increased it
-        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
-        {
-        #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-            // m_count.fetch_sub(1);
-            AtomicSub(&m_count, 1);
-        #endif
-            return true;
-        }
-
-        // it failed retrieving the element off the queue. Someone else must
-        // have read the element stored at countToIndex(currentReadIndex)
-        // before we could perform the CAS operation
-
-    } while(1); // keep looping to try again!
-
-    // Something went wrong. it shouldn't be possible to reach here
-    assert(0);
-
-    // Add this return statement to avoid compiler warnings
-    return false;
-}
-
-template <typename ELEM_T, typename Allocator>
-ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
-{
-    int currentCount = m_count;
-    uint32_t currentReadIndex = m_readIndex;
-    if (i >= currentCount)
-    {
-        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
-        std::exit(EXIT_FAILURE);
+    if (m_count == 0) {
+      return false;
     }
-    return m_theQueue[countToIndex(currentReadIndex+i)];
+#else
+    if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
+    {
+        // the queue is empty or
+        // a producer thread has allocate space in the queue but is
+        // waiting to commit the data into it
+        return false;
+    }
+#endif
+
+    // retrieve the data from the queue
+    a_data = m_theQueue[countToIndex(currentReadIndex)];
+
+    // try to perfrom now the CAS operation on the read index. If we succeed
+    // a_data already contains what m_readIndex pointed to before we
+    // increased it
+    if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+      // m_count.fetch_sub(1);
+      AtomicSub(&m_count, 1);
+#endif
+      return true;
+    }
+
+    // it failed retrieving the element off the queue. Someone else must
+    // have read the element stored at countToIndex(currentReadIndex)
+    // before we could perform the CAS operation
+
+  } while (1); // keep looping to try again!
+
+  // Something went wrong. it shouldn't be possible to reach here
+  assert(0);
+
+  // Add this return statement to avoid compiler warnings
+  return false;
+}
+
+template<typename ELEM_T, typename Allocator>
+ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) {
+  int currentCount = m_count;
+  uint32_t currentReadIndex = m_readIndex;
+  if (i >= currentCount) {
+    std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i
+              << " is out of range\n";
+    std::exit(EXIT_FAILURE);
+  }
+  return m_theQueue[countToIndex(currentReadIndex + i)];
 }
 
 #endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h
index 69630d9..a3b677c 100644
--- a/src/queue/array_lock_free_sem_queue.h
+++ b/src/queue/array_lock_free_sem_queue.h
@@ -8,7 +8,7 @@
 #include "shm_allocator.h"
 #include "futex_sem.h"
 #include "time_util.h"
-
+#include "bus_def.h"
 
 /// @brief implementation of an array based lock free queue with support for
 ///        multiple producers
@@ -17,8 +17,7 @@
 /// you must use the ArrayLockFreeSemQueue fachade:
 ///   ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q;
 
-#define LOCK_FREE_QUEUE_TIMEOUT  1
-#define LOCK_FREE_QUEUE_NOWAIT  1 << 1
+ 
 
 #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
@@ -207,15 +206,17 @@
   uint32_t currentWriteIndex;
   int s;
 
+  // sigset_t mask_all, pre;
+  // sigfillset(&mask_all);
   do
   {
     currentWriteIndex = m_writeIndex;
     currentReadIndex  = m_readIndex;
   #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
     if (m_count == Q_SIZE) {
-      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
         return -1;
-      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
         const struct timespec ts = TimeUtil::trim_time(timeout);
         s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
         if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
@@ -235,9 +236,9 @@
     if (currentReadIndex == currentWriteIndex - Q_SIZE  + 1   )
     {
         // the queue is full
-      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
         return -1;
-      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
         const struct timespec ts = TimeUtil::trim_time(timeout);
         s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0);
         if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
@@ -260,10 +261,11 @@
   // We know now that this index is reserved for us. Use it to save the data
   m_theQueue[countToIndex(currentWriteIndex)] = a_data;
 
+  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
+
   // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
   // inserting in the queue. It might fail if there are more than 1 producer threads because this
   // operation has to be done in the same order as the previous CAS
-
   while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
   {
     // this is a good place to yield the thread in case there are more
@@ -283,6 +285,7 @@
       err_exit(errno, "futex-FUTEX_WAKE");  
 #endif
 
+  // sigprocmask(SIG_SETMASK, &pre, NULL);
   return 0;
 }
 
@@ -293,6 +296,11 @@
   uint32_t currentMaximumReadIndex;
   uint32_t currentReadIndex;
   int s;
+  
+  // sigset_t mask_all, pre;
+  // sigfillset(&mask_all);
+
+  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
 
   do
   {
@@ -305,19 +313,23 @@
 
     if (m_count == 0) {
 
-      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+        // sigprocmask(SIG_SETMASK, &pre, NULL);
         return -1;
-      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+      }
+      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
         const struct timespec ts = TimeUtil::trim_time(timeout);
         s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
         if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
           // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+          // sigprocmask(SIG_SETMASK, &pre, NULL);
           return -1;
         }
             
       } else {
         s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
         if (s == -1 && errno != EAGAIN && errno != EINTR) {
+          // sigprocmask(SIG_SETMASK, &pre, NULL);
           return -1;
         }
       }
@@ -330,19 +342,23 @@
       // the queue is empty or
       // a producer thread has allocate space in the queue but is
       // waiting to commit the data into it
-      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+        // sigprocmask(SIG_SETMASK, &pre, NULL);
         return -1;
-      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+      }
+      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
         const struct timespec ts = TimeUtil::trim_time(timeout);
         s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0);
         if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
           // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+          // sigprocmask(SIG_SETMASK, &pre, NULL);
           return -1;
         }
             
       } else {
         s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0);
         if (s == -1 && errno != EAGAIN && errno != EINTR) {
+         // sigprocmask(SIG_SETMASK, &pre, NULL);
           return -1;
         }
       }
@@ -367,6 +383,7 @@
         err_exit(errno, "futex-FUTEX_WAKE");
     #endif
      
+      // sigprocmask(SIG_SETMASK, &pre, NULL);
       return 0;
     }
 
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 01e597c..9245d3e 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -12,20 +12,20 @@
 #include "shm_allocator.h"
 #include "psem.h"
 #include "bus_error.h"
-
+#include "bus_def.h"
 // default Queue size
 #define LOCK_FREE_Q_DEFAULT_SIZE 16
 
 // static Logger *logger = LoggerFactory::getLogger();
-// define this macro if calls to "size" must return the real size of the 
-// queue. If it is undefined  that function will try to take a snapshot of 
+// define this macro if calls to "size" must return the real size of the
+// queue. If it is undefined  that function will try to take a snapshot of
 // the queue, but returned value might be bogus
 
 
 // forward declarations for default template values
 //
 
-template <typename ELEM_T, typename Allocator>
+template<typename ELEM_T, typename Allocator>
 class ArrayLockFreeQueue;
 
 // template <typename ELEM_T>
@@ -33,9 +33,9 @@
 
 
 /// @brief Lock-free queue based on a circular array
-/// No allocation of extra memory for the nodes handling is needed, but it has 
-/// to add extra overhead (extra CAS operation) when inserting to ensure the 
-/// thread-safety of the queue when the queue type is not 
+/// No allocation of extra memory for the nodes handling is needed, but it has
+/// to add extra overhead (extra CAS operation) when inserting to ensure the
+/// thread-safety of the queue when the queue type is not
 /// ArrayLockFreeQueueSingleProducer.
 ///
 /// examples of instantiation:
@@ -50,113 +50,109 @@
 ///
 /// ELEM_T represents the type of elementes pushed and popped from the queue
 /// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
-///        This number should be a power of 2 to ensure 
-///        indexes in the circular queue keep stable when the uint32_t 
+///        This number should be a power of 2 to ensure
+///        indexes in the circular queue keep stable when the uint32_t
 ///        variable that holds the current position rolls over from FFFFFFFF
 ///        to 0. For instance
-///        2    -> 0x02 
+///        2    -> 0x02
 ///        4    -> 0x04
 ///        8    -> 0x08
 ///        16   -> 0x10
-///        (...) 
+///        (...)
 ///        1024 -> 0x400
 ///        2048 -> 0x800
 ///
 ///        if queue size is not defined as requested, let's say, for
 ///        instance 100, when current position is FFFFFFFF (4,294,967,295)
-///        index in the circular array is 4,294,967,295 % 100 = 95. 
-///        When that value is incremented it will be set to 0, that is the 
+///        index in the circular array is 4,294,967,295 % 100 = 95.
+///        When that value is incremented it will be set to 0, that is the
 ///        last 4 elements of the queue are not used when the counter rolls
 ///        over to 0
-/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and 
+/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
 ///        ArrayLockFreeQueue are supported (single producer
 ///        by default)
-template <
-    typename ELEM_T, 
-    typename Allocator = SHM_Allocator,
-    template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
-     >
-class LockFreeQueue
-{
+template<
+        typename ELEM_T,
+        typename Allocator = SHM_Allocator,
+        template<typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
+>
+class LockFreeQueue {
 
 private:
-    sem_t slots;  
-    sem_t items;
+  sem_t slots;
+  sem_t items;
 
 
-   
 public:
-    sem_t mutex;
-    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
-    
-    /// @brief destructor of the class. 
-    /// Note it is not virtual since it is not expected to inherit from this
-    /// template
-    ~LockFreeQueue();
-    std::atomic_uint reference;    
-    /// @brief constructor of the class
-   
+  sem_t mutex;
 
-    /// @brief returns the current number of items in the queue
-    /// It tries to take a snapshot of the size of the queue, but in busy environments
-    /// this function might return bogus values. 
-    ///
-    /// If a reliable queue size must be kept you might want to have a look at 
-    /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
-    /// it enables a reliable size though it hits overall performance of the queue 
-    /// (when the reliable size variable is on it's got an impact of about 20% in time)
-    inline uint32_t size();
-    
-    /// @brief return true if the queue is full. False otherwise
-    /// It tries to take a snapshot of the size of the queue, but in busy 
-    /// environments this function might return bogus values. See help in method
-    /// LockFreeQueue::size
-    inline bool full();
+  LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
 
-    inline bool empty();
+  /// @brief destructor of the class.
+  /// Note it is not virtual since it is not expected to inherit from this
+  /// template
+  ~LockFreeQueue();
 
-    inline ELEM_T& operator[](unsigned i);
-
-    /// @brief push an element at the tail of the queue
-    /// @param the element to insert in the queue
-    /// Note that the element is not a pointer or a reference, so if you are using large data
-    /// structures to be inserted in the queue you should think of instantiate the template
-    /// of the queue as a pointer to that large structure
-    /// @return true if the element was inserted in the queue. False if the queue was full
-    int push(const ELEM_T &a_data);
-    int push_nowait(const ELEM_T &a_data);
-    int push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
-
-    /// @brief pop the element at the head of the queue
-    /// @param a reference where the element in the head of the queue will be saved to
-    /// Note that the a_data parameter might contain rubbish if the function returns false
-    /// @return true if the element was successfully extracted from the queue. False if the queue was empty
-    int pop(ELEM_T &a_data);
-    int pop_nowait(ELEM_T &a_data);
-    int pop_timeout(ELEM_T &a_data, struct timespec * timeout);
+  std::atomic_uint reference;
+  /// @brief constructor of the class
 
 
-    void *operator new(size_t size);
-    void operator delete(void *p);
+  /// @brief returns the current number of items in the queue
+  /// It tries to take a snapshot of the size of the queue, but in busy environments
+  /// this function might return bogus values.
+  ///
+  /// If a reliable queue size must be kept you might want to have a look at
+  /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
+  /// it enables a reliable size though it hits overall performance of the queue
+  /// (when the reliable size variable is on it's got an impact of about 20% in time)
+  inline uint32_t size();
+
+  /// @brief return true if the queue is full. False otherwise
+  /// It tries to take a snapshot of the size of the queue, but in busy
+  /// environments this function might return bogus values. See help in method
+  /// LockFreeQueue::size
+  inline bool full();
+
+  inline bool empty();
+
+  inline ELEM_T &operator[](unsigned i);
+
+  /// @brief push an element at the tail of the queue
+  /// @param the element to insert in the queue
+  /// Note that the element is not a pointer or a reference, so if you are using large data
+  /// structures to be inserted in the queue you should think of instantiate the template
+  /// of the queue as a pointer to that large structure
+  /// @return true if the element was inserted in the queue. False if the queue was full
+  int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
+
+  /// @brief pop the element at the head of the queue
+  /// @param a reference where the element in the head of the queue will be saved to
+  /// Note that the a_data parameter might contain rubbish if the function returns false
+  /// @return true if the element was successfully extracted from the queue. False if the queue was empty
+  int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
+
+
+  void *operator new(size_t size);
+
+  void operator delete(void *p);
 
 protected:
-    /// @brief the actual queue. methods are forwarded into the real 
-    ///        implementation
-    Q_TYPE<ELEM_T, Allocator> m_qImpl;
+  /// @brief the actual queue. methods are forwarded into the real
+  ///        implementation
+  Q_TYPE<ELEM_T, Allocator> m_qImpl;
 
 private:
-    /// @brief disable copy constructor declaring it private
-    LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
+  /// @brief disable copy constructor declaring it private
+  LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
 };
 
 
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
-LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
-{
-// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
+template<
+        typename ELEM_T,
+        typename Allocator,
+        template<typename T, typename AT> class Q_TYPE>
+LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) {
+    // std::cout << "LockFreeQueue init reference=" << reference << std::endl;
     if (sem_init(&slots, 1, qsize) == -1)
         err_exit(errno, "LockFreeQueue sem_init");
     if (sem_init(&items, 1, 0) == -1)
@@ -164,194 +160,130 @@
     if (sem_init(&mutex, 1, 1) == -1)
         err_exit(errno, "LockFreeQueue sem_init");
 
-   
+
 }
 
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
-LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
-{
+
+template<
+        typename ELEM_T,
+        typename Allocator,
+        template<typename T, typename AT> class Q_TYPE>
+LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
     // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
-    if(sem_destroy(&slots) == -1) {
+    if (sem_destroy(&slots) == -1) {
         err_exit(errno, "LockFreeQueue sem_destroy");
     }
-    if(sem_destroy(&items) == -1) {
+    if (sem_destroy(&items) == -1) {
         err_exit(errno, "LockFreeQueue sem_destroy");
     }
-    if(sem_destroy(&mutex) == -1) {
+    if (sem_destroy(&mutex) == -1) {
         err_exit(errno, "LockFreeQueue sem_destroy");
     }
 }
 
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
-inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
-{
+template<
+        typename ELEM_T,
+        typename Allocator,
+        template<typename T, typename AT> class Q_TYPE>
+inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() {
     return m_qImpl.size();
-}  
+}
 
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
-inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
-{
+template<
+        typename ELEM_T,
+        typename Allocator,
+        template<typename T, typename AT> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() {
     return m_qImpl.full();
 }
 
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
-inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
-{
+template<
+        typename ELEM_T,
+        typename Allocator,
+        template<typename T, typename AT> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() {
     return m_qImpl.empty();
-}  
-
-
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
-{
- LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");   
-    if (psem_wait(&slots) == -1) {
-        return -1;
-    }
-    
-    if ( m_qImpl.push(a_data) ) {
-        psem_post(&items);   
-LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");   
-        return 0;
-    }
-    return -1;
-    
-}
-
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
-{
-    if (psem_trywait(&slots) == -1) {
-        return -1;
-    }
-
-    if ( m_qImpl.push(a_data)) {
-        psem_post(&items);     
-        return 0;
-    }
-    return -1;
-    
-}
-
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
-{
-LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");       
-    if ( psem_timedwait(&slots, ts) == -1) {
-        return -1;
-    }
-
-    if (m_qImpl.push(a_data)){
-        psem_post(&items);   
-LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");    
-        return 0;
-    }
-    return -1;
-    
 }
 
 
-
-
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
-{
-
-  LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
-    if (psem_wait(&items) == -1) {
-        return -1;
+template<typename ELEM_T,
+        typename Allocator,
+        template<typename T, typename AT> class Q_TYPE>
+int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
+    LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
+    if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+        if (psem_trywait(&slots) == -1) {
+            return -1;
+        }
+    } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+        if (psem_timedwait(&slots, timeout) == -1) {
+            return -1;
+        }
+    } else {
+        if (psem_wait(&slots) == -1) {
+            return -1;
+        }
     }
+
+
+    if (m_qImpl.push(a_data)) {
+        psem_post(&items);
+        LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
+        return 0;
+    }
+    return -1;
+
+}
+
+template<typename ELEM_T,
+        typename Allocator,
+        template<typename T, typename AT> class Q_TYPE>
+int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
+
+    LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
+
+
+    if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+        if (psem_trywait(&items) == -1) {
+            return -1;
+        }
+    } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+        if (psem_timedwait(&items, timeout) == -1) {
+            return -1;
+        }
+    } else {
+        if (psem_wait(&items) == -1) {
+            return -1;
+        }
+    }
+
 
     if (m_qImpl.pop(a_data)) {
         psem_post(&slots);
- LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");      
+        LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
         return 0;
     }
     return -1;
 }
 
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
-{
-    if (psem_trywait(&items) == -1) {
-        return -1;
-    }
-
-    if (m_qImpl.pop(a_data)) {
-        psem_post(&slots);     
-        return 0;
-    }
-    return -1;
-}
-
- 
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
-{
-    if (psem_timedwait(&items, ts) == -1) {
-       return -1;
-    }
-
-    if (m_qImpl.pop(a_data)) {
-        psem_post(&slots);  
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");     
-        return 0;
-    }
-    return -1;
-    
-}
-
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
-ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
+template<typename ELEM_T,
+        typename Allocator,
+        template<typename T, typename AT> class Q_TYPE>
+ELEM_T &LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
     return m_qImpl.operator[](i);
 }
 
 
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
-void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
-        return Allocator::allocate(size);
+template<typename ELEM_T,
+        typename Allocator,
+        template<typename T, typename AT> class Q_TYPE>
+void *LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size) {
+    return Allocator::allocate(size);
 }
 
-template <
-    typename ELEM_T, 
-    typename Allocator,
-    template <typename T, typename AT> class Q_TYPE>
+template<typename ELEM_T,
+        typename Allocator,
+        template<typename T, typename AT> class Q_TYPE>
 void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
     return Allocator::deallocate(p);
 }
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 5d2d9b6..0be124b 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -12,6 +12,7 @@
 #include "shm_allocator.h"
 #include "usg_common.h"
 #include "array_lock_free_sem_queue.h"
+#include "lock_free_queue.h"
 #include "bus_error.h"
 
 template <typename ELEM_T> class SHMQueue {
@@ -51,7 +52,7 @@
   /// @brief the actual queue-> methods are forwarded into the real
   ///        implementation
 
-  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *queue;
+  LockFreeQueue<ELEM_T, SHM_Allocator> *queue;
 
 private:
   /// @brief disable copy constructor declaring it private
@@ -64,7 +65,7 @@
   hashtable_t *hashtable = mm_get_hashtable();
   std::set<int> *keyset = hashtable_keyset(hashtable);
   std::set<int>::iterator keyItr;
-  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
+  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
   bool found;
   size_t count = 0;
   for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
@@ -77,7 +78,7 @@
     }
     if (!found) {
       // 閿�姣佸叡浜唴瀛樼殑queue
-      mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
+      mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
       delete mqueue;
       hashtable_remove(hashtable, *keyItr);
       count++;
@@ -91,11 +92,11 @@
 template <typename ELEM_T>
 size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) {
   hashtable_t *hashtable = mm_get_hashtable();
-  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
+  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
   size_t count = 0;
   for(int i = 0; i< length; i++) {
     // 閿�姣佸叡浜唴瀛樼殑queue
-    mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
+    mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
     delete mqueue;
     hashtable_remove(hashtable, keys[i]);
     count++;
@@ -113,9 +114,9 @@
 SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) {
 
   hashtable_t *hashtable = mm_get_hashtable();
-  queue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
+  queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
   if (queue == NULL || (void *)queue == (void *)1) {
-    queue = new ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator>(qsize);
+    queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
     hashtable_put(hashtable, key, (void *)queue);
   }
   // queue->reference++;
@@ -155,7 +156,7 @@
 
 template <typename ELEM_T>
 inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
-  int rv =  queue->push(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
+  int rv =  queue->push(a_data, NULL, BUS_NOWAIT_FLAG);
   if(rv == -1) {
     if (errno == EAGAIN)
       return EAGAIN;
@@ -170,7 +171,7 @@
 template <typename ELEM_T>
 inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) {
 
-  int rv = queue->push(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
+  int rv = queue->push(a_data, timeout, BUS_TIMEOUT_FLAG);
   if(rv == -1) {
     if(errno == ETIMEDOUT)
         return EBUS_TIMEOUT;
@@ -195,7 +196,7 @@
 
 template <typename ELEM_T>
 inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
-  int rv = queue->pop(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
+  int rv = queue->pop(a_data, NULL, BUS_NOWAIT_FLAG);
 
   if(rv == -1) {
     if (errno == EAGAIN)
@@ -213,7 +214,7 @@
 inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) {
 
   int rv;
-  rv = queue->pop(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
+  rv = queue->pop(a_data, timeout, BUS_TIMEOUT_FLAG);
   if(rv == -1) {
     if (errno == ETIMEDOUT) {
       return EBUS_TIMEOUT;
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 4340bc4..54effbe 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -51,7 +51,7 @@
 }
 // 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
 int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
-	return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
+	return shm_sendto(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG);
 }
  
 
@@ -74,7 +74,7 @@
 }
 
 int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
-	int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
+	int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG);
 	// logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
   return rv;
 }
@@ -92,7 +92,7 @@
 	return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
 }
 int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
-	return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
+	return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG);
 }
 
 int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
@@ -103,7 +103,7 @@
 	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
 }
 int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
-	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
+	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG);
 }
 
 
@@ -123,7 +123,7 @@
 	return _sub_(topic, size, key, timeout, 0);
 }
 int  ShmModSocket::sub_nowait(char *topic, int size, int key) {
-	return _sub_(topic, size, key, NULL,  (int)SHM_MSG_NOWAIT);
+	return _sub_(topic, size, key, NULL,  (int)BUS_NOWAIT_FLAG);
 }
 
 
@@ -142,7 +142,7 @@
 	return _desub_(topic, size, key, timeout, 0);
 }
 int  ShmModSocket::desub_nowait(char *topic, int size, int key) {
-	return _desub_(topic, size, key, NULL,  (int)SHM_MSG_NOWAIT);
+	return _desub_(topic, size, key, NULL,  (int)BUS_NOWAIT_FLAG);
 }
 
 
@@ -161,7 +161,7 @@
 	return _pub_( topic, topic_size, content, content_size, key, timeout, 0);
 }
 int  ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
-	return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT);
+	return _pub_(topic, topic_size, content, content_size, key, NULL, (int)BUS_NOWAIT_FLAG);
 }
 
 
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 76e906f..e370c72 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -373,9 +373,9 @@
   memcpy(dest.buf, buf, size);
 
  
-  if(flags & SHM_MSG_NOWAIT != 0) {
+  if( (flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
     rv = remoteQueue->push_nowait(dest);
-  } else if(timeout != NULL) {
+  } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
   	rv = remoteQueue->push_timeout(dest, timeout);
   } else {
   	rv = remoteQueue->push(dest);
@@ -393,8 +393,6 @@
       logger->error(rv, "sendto key %d failed", key);
     }
     return rv;
-   
-   
   }
 }
 
@@ -433,9 +431,9 @@
 
   shm_msg_t src;
  
-   if(flags & SHM_MSG_NOWAIT != 0) {
+   if((flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
     rv = socket->queue->pop_nowait(src);
-  } else if(timeout != NULL) {
+  } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
     rv = socket->queue->pop_timeout(src, timeout);
 // printf("0 shm_recvfrom====%d\n", rv);
   } else {
@@ -649,7 +647,7 @@
 
     switch (src.type) {
     case SHM_SOCKET_OPEN:
-      socket->acceptQueue->push_timeout(src, &timeout);
+      socket->acceptQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
       break;
     case SHM_SOCKET_CLOSE:
       _server_close_conn_to_client(socket, src.key);
@@ -660,7 +658,7 @@
       if (iter != socket->clientSocketMap->end()) {
         client_socket = iter->second;
         // print_msg("_server_run_msg_rev push before", src);
-        client_socket->messageQueue->push_timeout(src, &timeout);
+        client_socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
         // print_msg("_server_run_msg_rev push after", src);
       }
 
@@ -695,7 +693,7 @@
       _client_close_conn_to_server(socket);
       break;
     case SHM_COMMON_MSG:
-      socket->messageQueue->push_timeout(src, &timeout);
+      socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
       break;
     default:
        logger->error( "shm_socket._client_run_msg_rev: undefined message type.");
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index abc8e20..b1665a0 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -6,11 +6,7 @@
 #include "shm_queue.h"
 #include "lock_free_queue.h"
 
-enum shm_socket_flag_t
-{
-  SHM_MSG_TIMEOUT = 1,
-  SHM_MSG_NOWAIT = 2
-};
+ 
 
 enum shm_connection_status_t {
 	SHM_CONN_CLOSED=1,
@@ -88,7 +84,7 @@
 int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
 
 /**
- * @flags : SHM_MSG_NOWAIT
+ * @flags : BUS_NOWAIT_FLAG
  */
 int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0);
 
diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh
index cd4c809..fc639ca 100755
--- a/test_net_socket/net_mod_socket.sh
+++ b/test_net_socket/net_mod_socket.sh
@@ -36,7 +36,7 @@
 }
 
 function close() {
-	ps -ef | grep -e "test_net_mod_socket" -e "net_mod_socket"| awk  '{print $2}' | xargs -i kill -9 {}
+	ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk  '{print $2}' | xargs -i kill -9 {}
 	ipcrm -a
 }
 
@@ -48,8 +48,6 @@
 
 case ${1} in
   "server")
-	close
-	sleep 2
   server
   ;;
   "client")
diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp
index 0b2afd6..2fcd604 100644
--- a/test_net_socket/test_net_mod_socket.cpp
+++ b/test_net_socket/test_net_mod_socket.cpp
@@ -76,10 +76,13 @@
   void *recvbuf;
   int size;
   int key;
-  while (net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) == 0) {
+  int rv;
+  while ((rv = net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) ) == 0) {
     printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf);
     free(recvbuf);
   }
+
+  printf("print_sub_msg return . rv = %d\n", rv);
   
 }
 
diff --git a/test_socket/CMakeLists.txt b/test_socket/CMakeLists.txt
new file mode 100644
index 0000000..a3f3fa5
--- /dev/null
+++ b/test_socket/CMakeLists.txt
@@ -0,0 +1,11 @@
+# add the executable
+add_executable(bus_test bus_test.cpp)
+target_link_libraries(bus_test PRIVATE shm_queue  ${EXTRA_LIBS} )
+target_include_directories(bus_test PRIVATE
+                            "${PROJECT_BINARY_DIR}"
+                             ${EXTRA_INCLUDES}
+                            )
+
+ 
+
+ 
diff --git a/test_socket/bus_test.cpp b/test_socket/bus_test.cpp
new file mode 100644
index 0000000..cdd6142
--- /dev/null
+++ b/test_socket/bus_test.cpp
@@ -0,0 +1,116 @@
+#include "bus_server_socket.h"
+#include "shm_mod_socket.h"
+#include "shm_mm_wrapper.h"
+#include "usg_common.h"
+#include "mm.h"
+
+BusServerSocket * server_socket;
+void sigint_handler(int sig) {
+  
+   exit(0);
+}
+
+void server(int key) {
+  server_socket = new BusServerSocket();
+
+  server_socket->bind( key);
+   
+  server_socket->start();
+}
+
+
+void *run_recv(void *skptr) {
+  pthread_detach(pthread_self());
+  void *recvbuf;
+  int size;
+  int key;
+  ShmModSocket *sk = (ShmModSocket *)skptr;
+  while (sk->recvfrom( &recvbuf, &size, &key) == 0) {
+    printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf);
+    free(recvbuf);
+  }
+  
+}
+
+void client(int key) {
+  ShmModSocket *sk = new ShmModSocket();
+  
+  pthread_t tid;
+  pthread_create(&tid, NULL, run_recv, (void *)socket);
+  int size;
+  
+  char action[512];
+  char topic[512];
+  char content[512];
+  long i = 0;
+  while (true) {
+    //printf("Usage: pub <topic> [content] or sub <topic>\n");
+    printf("Can I help you? sub, pub, desub or quit\n");
+    scanf("%s",action);
+    
+    if(strcmp(action, "sub") == 0) {
+      printf("Please input topic!\n");
+      scanf("%s", topic);
+      if (sk->sub(topic, strlen(topic),  key) == 0) {
+         printf("%d Sub success!\n", sk->get_key());
+      } else {
+        printf("Sub failture!\n");
+        exit(0);
+      }
+     
+    } else if(strcmp(action, "desub") == 0) {
+      printf("Please input topic!\n");
+      scanf("%s", topic);
+      if (sk->desub(topic, strlen(topic),  key) == 0) {
+         printf("%d Desub success!\n", sk->get_key());
+      } else {
+        printf("Desub failture!\n");
+        exit(0);
+      }
+     
+    } else if(strcmp(action, "pub") == 0) {
+      // printf("%s %s %s\n", action, topic, content);
+      printf("Please input topic and content\n");
+      scanf("%s %s", topic, content);
+      if(sk->pub(topic, strlen(topic)+1, content, strlen(content)+1,  key) == 0){
+        printf("%d Pub success!\n", sk->get_key());
+      } else {
+        printf("Pub failture!\n");
+      }
+      
+    } else if(strcmp(action, "quit") == 0) {
+      printf("(%d) quit\n", sk->get_key());
+      delete sk;
+      break;
+    } else {
+      printf("error input argument\n");
+      continue;
+    }
+   
+  }
+ 
+}
+
+ 
+
+int main(int argc, char *argv[]) {
+  shm_mm_wrapper_init(512);
+  int key;
+  if (argc < 3) {
+    fprintf(stderr, "Usage: %s %s|%s  <key> ...\n", argv[0], "server", "client");
+    return 1;
+  }
+
+  key = atoi(argv[2]);
+
+  if (strcmp("server", argv[1]) == 0) {
+   server(key);
+    
+  } else if (strcmp("client", argv[1]) == 0) {
+    client(key);
+  }  
+
+
+  
+  return 0;
+}
\ No newline at end of file
diff --git a/test_socket/dgram_mod_bus.cpp b/test_socket/dgram_mod_bus.cpp
deleted file mode 100644
index 042b2fd..0000000
--- a/test_socket/dgram_mod_bus.cpp
+++ /dev/null
@@ -1,131 +0,0 @@
-#include "dgram_mod_socket.h"
-#include "shm_mm_wraper.h"
-#include "usg_common.h"
-#include "mm.h"
-
-void * server_socket;
-void sigint_handler(int sig) {
-   dgram_mod_close_socket(server_socket);
-   exit(0);
-}
-
-void server(int key, bool restart) {
-  server_socket = dgram_mod_open_socket();
-
-
-  if(restart) {
-    dgram_mod_force_bind(server_socket, key);
-  } else {
-     dgram_mod_bind(server_socket, key);
-  }
- 
-   
-  dgram_mod_start_bus(server_socket);
-}
-
-
-void *run_recv(void *socket) {
-  pthread_detach(pthread_self());
-  void *recvbuf;
-  int size;
-  int key;
-  while (dgram_mod_recvfrom( socket, &recvbuf, &size, &key) == 0) {
-    printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf);
-    free(recvbuf);
-  }
-  
-}
-
-void client(int key) {
-  void *socket = dgram_mod_open_socket();
-  
-  pthread_t tid;
-  pthread_create(&tid, NULL, run_recv, socket);
-  int size;
-  
-  char action[512];
-  char topic[512];
-  char content[512];
-  long i = 0;
-  while (true) {
-    //printf("Usage: pub <topic> [content] or sub <topic>\n");
-    printf("Can I help you? sub, pub, desub or quit\n");
-    scanf("%s",action);
-    
-    if(strcmp(action, "sub") == 0) {
-      printf("Please input topic!\n");
-      scanf("%s", topic);
-      if (dgram_mod_sub(socket, topic, strlen(topic),  key) == 0) {
-         printf("%d Sub success!\n", dgram_mod_get_port(socket));
-      } else {
-        printf("Sub failture!\n");
-        exit(0);
-      }
-     
-    } else if(strcmp(action, "desub") == 0) {
-      printf("Please input topic!\n");
-      scanf("%s", topic);
-      if (dgram_mod_desub(socket, topic, strlen(topic),  key) == 0) {
-         printf("%d Desub success!\n", dgram_mod_get_port(socket));
-      } else {
-        printf("Desub failture!\n");
-        exit(0);
-      }
-     
-    } else if(strcmp(action, "pub") == 0) {
-      // printf("%s %s %s\n", action, topic, content);
-      printf("Please input topic and content\n");
-      scanf("%s %s", topic, content);
-      if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1,  key) == 0){
-        printf("%d Pub success!\n", dgram_mod_get_port(socket));
-      } else {
-        printf("Pub failture!\n");
-      }
-      
-    } else if(strcmp(action, "quit") == 0) {
-      printf("(%d) quit\n", dgram_mod_get_port(socket));
-      dgram_mod_close_socket(socket);
-      break;
-    } else {
-      printf("error input argument\n");
-      continue;
-    }
-   
-  }
- 
-}
-
- 
-
-int main(int argc, char *argv[]) {
-  shm_mm_wrapper_init(512);
-  int key;
-  if (argc < 3) {
-    fprintf(stderr, "Usage: %s %s|%s|rmkey <key> ...\n", argv[0], "server", "client");
-    return 1;
-  }
-
-  key = atoi(argv[2]);
-
-  if (strcmp("server", argv[1]) == 0) {
-    if(argc >= 4 && strcmp("restart", argv[3]) == 0) {
-      server(key, true);
-    }
-    else{
-      server(key, false);
-    }
-    
-  } else if (strcmp("client", argv[1]) == 0) {
-    client(key);
-  } else if(strcmp("rmkey", argv[1]) == 0) {
-    for(int i = 2; i < argc; i++) {
-      key = atoi(argv[i]);
-      dgram_mod_remove_key(key);
-      // printf("%d\n", key);
-    }
-  }
-
-
-  
-  return 0;
-}
\ No newline at end of file

--
Gitblit v1.8.0