From 91ec036cace39fd5b5f04644f6bced1f477005e0 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 21 七月 2020 19:33:28 +0800
Subject: [PATCH] update

---
 src/util/include/sem_util.h            |    0 
 build/include/shm_socket.h             |   87 +
 Makefile                               |    3 
 build/include/array_lock_free_queue.h  |  322 ++++++
 build/include/lock_free_queue.h        |  360 +++++++
 demo/pub_sub                           |    0 
 src/queue/shm_queue_wrapper.c          |    2 
 build/include/shm_queue_wrapper.h      |  100 +
 Make.defines.linux                     |    2 
 build/include/mm.h                     |   20 
 src/socket/mod_socket.c                |    9 
 build/include/shm_mm.h                 |   26 
 build/include/mem_pool.h               |   55 +
 build/include/array_lock_free_queue2.h |  332 ++++++
 test/dgram_socket_test                 |    0 
 src/socket/include/shm_socket.h        |   18 
 src/libshm_queue.a                     |    0 
 src/socket/shm_socket.c                |  678 +++++++-----
 demo/queue                             |    0 
 src/queue/include/shm_queue.h          |    1 
 build/include/hashtable.h              |   34 
 build/include/linked_lock_free_queue.h |  245 ++++
 src/util/sem_util.c                    |  208 +--
 test/dgram_socket_test.c               |   74 +
 build/include/logger_factory.h         |   17 
 demo/Makefile                          |   13 
 demo/queue.c                           |    4 
 build/include/sem_util.h               |    0 
 src/queue/include/shm_allocator.h      |    6 
 /dev/null                              |    0 
 test/Makefile                          |   21 
 build/include/mod_socket.h             |   76 +
 build/include/shm_allocator.h          |  102 ++
 src/queue/include/shm_queue_wrapper.h  |    3 
 demo/req_rep                           |    0 
 src/Makefile                           |    8 
 build/lib/libshm_queue.a               |    0 
 build/include/shm_queue.h              |  184 +++
 src/logger_factory.h                   |    4 
 39 files changed, 2,566 insertions(+), 448 deletions(-)

diff --git a/Make.defines.linux b/Make.defines.linux
index ef20eaa..9e30018 100755
--- a/Make.defines.linux
+++ b/Make.defines.linux
@@ -25,7 +25,7 @@
 
 
 # Common temp files to delete from each directory.
-TEMPFILES=core core.* *.o temp.* *.out *.a *.so
+TEMPFILES=core core.*  **/*.o temp.* *.out *.a *.so
 
 %:	%.c 
 	$(CC) $(CFLAGS) $^ -o $@ $(LDFLAGS) $(LDLIBS)
diff --git a/Makefile b/Makefile
index adac55f..44bd26e 100755
--- a/Makefile
+++ b/Makefile
@@ -1,4 +1,4 @@
-DIRS = src test test2 demo
+DIRS = src test demo
 
 all:
 	for i in $(DIRS); do \
@@ -9,6 +9,7 @@
 	for i in $(DIRS); do \
 		(cd $$i && echo "cleaning $$i" && $(MAKE) clean) || exit 1; \
 	done
+	rm -rf build
 
 ipcrm:
 	-ipcrm -a
diff --git a/build/include/array_lock_free_queue.h b/build/include/array_lock_free_queue.h
new file mode 100644
index 0000000..24a4ec6
--- /dev/null
+++ b/build/include/array_lock_free_queue.h
@@ -0,0 +1,322 @@
+#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
+#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
+#include "atomic_ops.h"
+#include <assert.h> // assert()
+#include <sched.h>  // sched_yield()
+#include "logger_factory.h"
+#include "mem_pool.h"
+#include "shm_allocator.h"
+
+/// @brief implementation of an array based lock free queue with support for 
+///        multiple producers
+/// This class is prevented from being instantiated directly (all members and
+/// methods are private). To instantiate a multiple producers lock free queue 
+/// you must use the ArrayLockFreeQueue fachade:
+///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
+
+
+#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+template <typename ELEM_T, typename Allocator = SHM_Allocator>
+class ArrayLockFreeQueue
+{
+    // ArrayLockFreeQueue will be using this' private members
+    template <
+        typename ELEM_T_, 
+        typename Allocator_,
+        template <typename T, typename AT> class Q_TYPE
+        >
+    friend class LockFreeQueue;
+
+private:
+    /// @brief constructor of the class
+    ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
+    
+    virtual ~ArrayLockFreeQueue();
+    
+    inline uint32_t size();
+    
+    inline bool full();
+
+    inline bool empty();
+    
+    bool push(const ELEM_T &a_data);   
+    
+    bool pop(ELEM_T &a_data);
+    
+    /// @brief calculate the index in the circular array that corresponds
+    /// to a particular "count" value
+    inline uint32_t countToIndex(uint32_t a_count);
+
+    ELEM_T& operator[](unsigned i);
+    
+private:    
+    size_t Q_SIZE;
+    /// @brief array to keep the elements
+    ELEM_T *m_theQueue;
+
+    /// @brief where a new element will be inserted
+    uint32_t m_writeIndex;
+
+    /// @brief where the next element where be extracted from
+    uint32_t m_readIndex;
+    
+    /// @brief maximum read index for multiple producer queues
+    /// If it's not the same as m_writeIndex it means
+    /// there are writes pending to be "committed" to the queue, that means,
+    /// the place for the data was reserved (the index in the array) but  
+    /// data is still not in the queue, so the thread trying to read will have 
+    /// to wait for those other threads to save the data into the queue
+    ///
+    /// note this is only used for multiple producers
+    uint32_t m_maximumReadIndex;
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+    /// @brief number of elements in the queue
+    uint32_t m_count;
+#endif
+   
+    
+private:
+    /// @brief disable copy constructor declaring it private
+    ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
+
+};
+
+
+template <typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
+    Q_SIZE(qsize),
+    m_writeIndex(0),      // initialisation is not atomic
+    m_readIndex(0),       //
+    m_maximumReadIndex(0) //
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+    ,m_count(0)           //
+#endif
+{
+    m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
+
+}
+
+template <typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
+{
+    // std::cout << "destroy ArrayLockFreeQueue\n";
+    Allocator::deallocate(m_theQueue);
+    
+}
+
+template <typename ELEM_T, typename Allocator>
+inline 
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
+{
+    // if Q_SIZE is a power of 2 this statement could be also written as 
+    // return (a_count & (Q_SIZE - 1));
+    return (a_count % Q_SIZE);
+}
+
+template <typename ELEM_T, typename Allocator>
+inline 
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+    return m_count;
+#else
+
+    uint32_t currentWriteIndex = m_maximumReadIndex;
+    uint32_t currentReadIndex  = m_readIndex;
+
+    // let's think of a scenario where this function returns bogus data
+    // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
+    // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
+    // 2. afterwards this thread is preemted. While this thread is inactive 2 
+    // elements are inserted and removed from the queue, so m_maximumReadIndex 
+    // is 5 and m_readIndex 4. Real size is still 1
+    // 3. Now the current thread comes back from preemption and reads m_readIndex.
+    // currentReadIndex is 4
+    // 4. currentReadIndex is bigger than currentWriteIndex, so
+    // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+    // it returns that the queue is almost full, when it is almost empty
+    //
+    if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
+    {
+        return (currentWriteIndex - currentReadIndex);
+    }
+    else
+    {
+        return (Q_SIZE + currentWriteIndex - currentReadIndex);
+    }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, typename Allocator>
+inline 
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+    return (m_count == (Q_SIZE));
+#else
+
+    uint32_t currentWriteIndex = m_writeIndex;
+    uint32_t currentReadIndex  = m_readIndex;
+    
+    if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+    {
+        // the queue is full
+        return true;
+    }
+    else
+    {
+        // not full!
+        return false;
+    }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, typename Allocator>
+inline 
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+    return (m_count == 0);
+#else
+
+    if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
+    {
+        // the queue is full
+        return true;
+    }
+    else
+    {
+        // not full!
+        return false;
+    }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+
+
+
+
+
+template <typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
+{
+    uint32_t currentReadIndex;
+    uint32_t currentWriteIndex;
+
+    do
+    {
+
+        currentWriteIndex = m_writeIndex;
+        currentReadIndex  = m_readIndex;
+    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+        if (m_count == Q_SIZE) {
+            return false;
+        }
+    #else
+        if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+        {
+            // the queue is full
+            return false;
+        }
+    #endif
+
+    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+    // We know now that this index is reserved for us. Use it to save the data
+    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+
+    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
+    // inserting in the queue. It might fail if there are more than 1 producer threads because this
+    // operation has to be done in the same order as the previous CAS
+
+    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
+    {
+        // this is a good place to yield the thread in case there are more
+        // software threads than hardware processors and you have more
+        // than 1 producer thread
+        // have a look at sched_yield (POSIX.1b)
+        sched_yield();
+    }
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+    AtomicAdd(&m_count, 1);
+#endif
+    return true;
+}
+
+
+template <typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
+{
+    uint32_t currentMaximumReadIndex;
+    uint32_t currentReadIndex;
+
+    do
+    {
+        // to ensure thread-safety when there is more than 1 producer thread
+        // a second index is defined (m_maximumReadIndex)
+        currentReadIndex        = m_readIndex;
+        currentMaximumReadIndex = m_maximumReadIndex;
+    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+        if (m_count == 0) {
+            return false;
+        }
+    #else
+        if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
+        {
+            // the queue is empty or
+            // a producer thread has allocate space in the queue but is
+            // waiting to commit the data into it
+            return false;
+        }
+    #endif
+
+        // retrieve the data from the queue
+        a_data = m_theQueue[countToIndex(currentReadIndex)];
+
+        // try to perfrom now the CAS operation on the read index. If we succeed
+        // a_data already contains what m_readIndex pointed to before we
+        // increased it
+        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
+        {
+        #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+            // m_count.fetch_sub(1);
+            AtomicSub(&m_count, 1);
+        #endif
+            return true;
+        }
+
+        // it failed retrieving the element off the queue. Someone else must
+        // have read the element stored at countToIndex(currentReadIndex)
+        // before we could perform the CAS operation
+
+    } while(1); // keep looping to try again!
+
+    // Something went wrong. it shouldn't be possible to reach here
+    assert(0);
+
+    // Add this return statement to avoid compiler warnings
+    return false;
+}
+
+template <typename ELEM_T, typename Allocator>
+ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
+{
+    int currentCount = m_count;
+    uint32_t currentReadIndex = m_readIndex;
+    if (i < 0 || i >= currentCount)
+    {
+        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
+        std::exit(EXIT_FAILURE);
+    }
+    return m_theQueue[countToIndex(currentReadIndex+i)];
+}
+
+#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
diff --git a/build/include/array_lock_free_queue2.h b/build/include/array_lock_free_queue2.h
new file mode 100644
index 0000000..3b79b7f
--- /dev/null
+++ b/build/include/array_lock_free_queue2.h
@@ -0,0 +1,332 @@
+#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
+#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
+
+#include <assert.h> // assert()
+#include <sched.h>  // sched_yield()
+#include "logger_factory.h"
+
+/// @brief implementation of an array based lock free queue with support for 
+///        multiple producers
+/// This class is prevented from being instantiated directly (all members and
+/// methods are private). To instantiate a multiple producers lock free queue 
+/// you must use the ArrayLockFreeQueue fachade:
+///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
+
+
+#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+template <typename ELEM_T>
+class ArrayLockFreeQueue
+{
+    // ArrayLockFreeQueue will be using this' private members
+    template <
+        typename ELEM_T_, 
+        template <typename T> class Q_TYPE >
+    friend class LockFreeQueue;
+
+private:
+    /// @brief constructor of the class
+    ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
+    
+    virtual ~ArrayLockFreeQueue();
+    
+    inline uint32_t size();
+    
+    inline bool full();
+
+    inline bool empty();
+    
+    bool push(const ELEM_T &a_data);   
+    
+    bool pop(ELEM_T &a_data);
+    
+    /// @brief calculate the index in the circular array that corresponds
+    /// to a particular "count" value
+    inline uint32_t countToIndex(uint32_t a_count);
+
+    ELEM_T& operator[](unsigned i);
+    
+private:    
+    size_t Q_SIZE;
+    /// @brief array to keep the elements
+    ELEM_T *m_theQueue;
+
+    /// @brief where a new element will be inserted
+    std::atomic<uint32_t> m_writeIndex;
+
+    /// @brief where the next element where be extracted from
+    std::atomic<uint32_t> m_readIndex;
+    
+    /// @brief maximum read index for multiple producer queues
+    /// If it's not the same as m_writeIndex it means
+    /// there are writes pending to be "committed" to the queue, that means,
+    /// the place for the data was reserved (the index in the array) but  
+    /// data is still not in the queue, so the thread trying to read will have 
+    /// to wait for those other threads to save the data into the queue
+    ///
+    /// note this is only used for multiple producers
+    std::atomic<uint32_t> m_maximumReadIndex;
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+    /// @brief number of elements in the queue
+    std::atomic<uint32_t> m_count;
+#endif
+   
+    
+private:
+    /// @brief disable copy constructor declaring it private
+    ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src);
+
+};
+
+
+template <typename ELEM_T>
+ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize):
+    Q_SIZE(qsize),
+    m_writeIndex(0),      // initialisation is not atomic
+    m_readIndex(0),       //
+    m_maximumReadIndex(0) //
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+    ,m_count(0)           //
+#endif
+{
+    m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T));
+
+}
+
+template <typename ELEM_T>
+ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue()
+{
+    // std::cout << "destroy ArrayLockFreeQueue\n";
+    mm_free(m_theQueue);
+    
+}
+
+template <typename ELEM_T>
+inline 
+uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count)
+{
+    // if Q_SIZE is a power of 2 this statement could be also written as 
+    // return (a_count & (Q_SIZE - 1));
+    return (a_count % Q_SIZE);
+}
+
+template <typename ELEM_T>
+inline 
+uint32_t ArrayLockFreeQueue<ELEM_T>::size()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+    return m_count.load();
+#else
+
+    uint32_t currentWriteIndex = m_maximumReadIndex.load();
+    uint32_t currentReadIndex  = m_readIndex.load();
+
+    // let's think of a scenario where this function returns bogus data
+    // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
+    // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
+    // 2. afterwards this thread is preemted. While this thread is inactive 2 
+    // elements are inserted and removed from the queue, so m_maximumReadIndex 
+    // is 5 and m_readIndex 4. Real size is still 1
+    // 3. Now the current thread comes back from preemption and reads m_readIndex.
+    // currentReadIndex is 4
+    // 4. currentReadIndex is bigger than currentWriteIndex, so
+    // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+    // it returns that the queue is almost full, when it is almost empty
+    //
+    if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
+    {
+        return (currentWriteIndex - currentReadIndex);
+    }
+    else
+    {
+        return (Q_SIZE + currentWriteIndex - currentReadIndex);
+    }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T>
+inline 
+bool ArrayLockFreeQueue<ELEM_T>::full()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+    return (m_count.load() == (Q_SIZE));
+#else
+
+    uint32_t currentWriteIndex = m_writeIndex;
+    uint32_t currentReadIndex  = m_readIndex;
+    
+    if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+    {
+        // the queue is full
+        return true;
+    }
+    else
+    {
+        // not full!
+        return false;
+    }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T>
+inline 
+bool ArrayLockFreeQueue<ELEM_T>::empty()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+    return (m_count.load() == 0);
+#else
+
+    if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load()))
+    {
+        // the queue is full
+        return true;
+    }
+    else
+    {
+        // not full!
+        return false;
+    }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+
+template <typename ELEM_T>
+bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
+{
+    uint32_t currentReadIndex;
+    uint32_t currentWriteIndex;
+    
+    do
+    {
+        currentWriteIndex = m_writeIndex.load();
+        currentReadIndex = m_readIndex.load();
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+        if (m_count.load() == Q_SIZE) {
+            return false;
+        }
+#else
+        if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+        {
+            // the queue is full
+            return false;
+        }
+#endif
+        
+    // There is more than one producer. Keep looping till this thread is able 
+    // to allocate space for current piece of data
+    //
+    // using compare_exchange_strong because it isn't allowed to fail spuriously
+    // When the compare_exchange operation is in a loop the weak version
+    // will yield better performance on some platforms, but here we'd have to
+    // load m_writeIndex all over again
+    } while (!m_writeIndex.compare_exchange_strong(
+                currentWriteIndex, (currentWriteIndex + 1)));
+    
+    // Just made sure this index is reserved for this thread.
+    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+    //memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) );
+    
+    // update the maximum read index after saving the piece of data. It can't
+    // fail if there is only one thread inserting in the queue. It might fail 
+    // if there is more than 1 producer thread because this operation has to
+    // be done in the same order as the previous CAS
+    //
+    // using compare_exchange_weak because they are allowed to fail spuriously
+    // (act as if *this != expected, even if they are equal), but when the
+    // compare_exchange operation is in a loop the weak version will yield
+    // better performance on some platforms.
+    while (!m_maximumReadIndex.compare_exchange_weak(
+                currentWriteIndex, (currentWriteIndex + 1)))
+    {
+        // this is a good place to yield the thread in case there are more
+        // software threads than hardware processors and you have more
+        // than 1 producer thread
+        // have a look at sched_yield (POSIX.1b)
+        sched_yield();
+    }
+
+    // The value was successfully inserted into the queue
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+    m_count.fetch_add(1);
+#endif
+
+    return true;
+}
+
+template <typename ELEM_T>
+bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
+{
+    uint32_t currentMaximumReadIndex;
+    uint32_t currentReadIndex;
+
+    do
+    {
+        currentReadIndex = m_readIndex.load();
+        currentMaximumReadIndex = m_maximumReadIndex.load();
+
+     #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+        if (m_count.load() == 0) {
+            return false;
+        }
+    #else
+        // to ensure thread-safety when there is more than 1 producer 
+        // thread a second index is defined (m_maximumReadIndex)
+        if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
+        {
+            // the queue is empty or
+            // a producer thread has allocate space in the queue but is 
+            // waiting to commit the data into it
+            return false;
+        }
+    #endif
+
+        // retrieve the data from the queue
+        a_data = m_theQueue[countToIndex(currentReadIndex)];
+        //memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) );
+        // try to perfrom now the CAS operation on the read index. If we succeed
+        // a_data already contains what m_readIndex pointed to before we 
+        // increased it
+        if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1)))
+        {
+            // got here. The value was retrieved from the queue. Note that the
+            // data inside the m_queue array is not deleted nor reseted
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+            m_count.fetch_sub(1);
+#endif
+            return true;
+        }
+        
+        // it failed retrieving the element off the queue. Someone else must
+        // have read the element stored at countToIndex(currentReadIndex)
+        // before we could perform the CAS operation        
+
+    } while(1); // keep looping to try again!
+
+    // Something went wrong. it shouldn't be possible to reach here
+    assert(0);
+
+    // Add this return statement to avoid compiler warnings
+    return false;    
+}
+
+
+template <typename ELEM_T>
+ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i)
+{
+    int currentCount = m_count.load();
+    uint32_t currentReadIndex = m_readIndex.load();
+    if (i < 0 || i >= currentCount)
+    {
+        std::cerr << "Error in array limits: " << i << " is out of range\n";
+        std::exit(EXIT_FAILURE);
+    }
+    return m_theQueue[countToIndex(currentReadIndex+i)];
+}
+
+#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
diff --git a/build/include/hashtable.h b/build/include/hashtable.h
new file mode 100644
index 0000000..726a5bc
--- /dev/null
+++ b/build/include/hashtable.h
@@ -0,0 +1,34 @@
+#ifndef __HASHTABLE_H__
+#define __HASHTABLE_H__
+
+#include <sys/queue.h>
+#include <set>
+
+#define MAPSIZE 100
+
+typedef struct hashtable_t
+{
+ struct tailq_header_t* array[MAPSIZE];
+ int mutex;
+ int wlock;
+ int cond;
+ size_t readcnt;
+
+} hashtable_t;
+typedef void (*hashtable_foreach_cb)(int key, void *value);
+
+void hashtable_init(hashtable_t *hashtable);
+void *hashtable_get(hashtable_t *hashtable, int key);
+void hashtable_put(hashtable_t *hashtable, int key, void *value);
+void *hashtable_remove(hashtable_t *hashtable, int key);
+void hashtable_removeall(hashtable_t *hashtable);
+
+
+void hashtable_foreach(hashtable_t *hashtable, hashtable_foreach_cb cb);
+
+void hashtable_printall(hashtable_t *hashtable);
+
+int hashtable_alloc_key(hashtable_t *hashtable);
+
+std::set<int> * hashtable_keyset(hashtable_t *hashtable) ;
+#endif
diff --git a/build/include/linked_lock_free_queue.h b/build/include/linked_lock_free_queue.h
new file mode 100644
index 0000000..3906a42
--- /dev/null
+++ b/build/include/linked_lock_free_queue.h
@@ -0,0 +1,245 @@
+// queue.h -- interface for a queue
+#ifndef __LINKED_LOCK_FREE_QUEUE_H_
+#define __LINKED_LOCK_FREE_QUEUE_H_
+#include "mm.h" 
+#include "sem_util.h"
+
+template <typename T> class Node;
+
+template <typename T>
+class Pointer {
+public:
+
+    Node<T> *ptr;
+    unsigned long count;
+    Pointer( Node<T> *node = NULL, int c=0) noexcept :  ptr(node), count(c) {}
+     
+    bool operator == (const Pointer<T> o) const {
+      return (o.ptr == ptr) && (o.count == count);
+    }
+    bool operator != (const Pointer<T> o) const {
+      return !((o.ptr == ptr) && (o.count == count));
+    }
+
+    
+   
+};
+
+template <typename T>
+class Node {
+public:
+    alignas(16) std::atomic<Pointer<T> > next;
+    T value; 
+    
+    Node() {
+    }
+
+    void *operator new(size_t size){
+        return mm_malloc(size);
+    }
+
+    void operator delete(void *p) {
+        return mm_free(p);
+    }
+};
+
+
+
+
+
+template <typename ELEM_T>
+class LinkedLockFreeQueue
+{
+
+    template <
+        typename ELEM_T_, 
+        template <typename T> class Q_TYPE >
+    friend class LockFreeQueue;
+private:
+// class scope definitions
+    enum {Q_SIZE = 10};
+  
+// private class members
+    std::atomic<Pointer<ELEM_T> > Head;       // pointer to front of Queue
+    std::atomic<Pointer<ELEM_T> > Tail;        // pointer to rear of Queue
+    //std::atomic_uint count;          // current number of size in Queue
+    std::atomic_uint count;
+    const size_t qsize;    // maximum number of size in Queue
+    // preemptive definitions to prevent public copying
+    LinkedLockFreeQueue(const LinkedLockFreeQueue & q) : qsize(0) { }
+    LinkedLockFreeQueue & operator=(const LinkedLockFreeQueue & q) { return *this;}
+protected:
+    LinkedLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit
+    ~LinkedLockFreeQueue();
+    bool empty() const;
+    bool full() const;
+    unsigned int size() const;
+    bool push(const ELEM_T &item); // add item to end
+    bool pop(ELEM_T &item);
+    
+
+    ELEM_T& operator[](unsigned i);
+
+};
+
+
+// Queue methods
+template <typename T>
+LinkedLockFreeQueue<T>::LinkedLockFreeQueue(size_t qs) : count(0), qsize(qs)
+{
+    Node<T> *node = new Node<T>;
+    Pointer<T> pointer(node, 0);
+    
+    Head.store(pointer, std::memory_order_relaxed);
+    Tail.store(pointer, std::memory_order_relaxed);
+
+}
+
+template <typename T>
+LinkedLockFreeQueue<T>::~LinkedLockFreeQueue()
+{
+    LoggerFactory::getLogger().debug("LinkedLockFreeQueue destory");
+    Node<T> * nodeptr;
+    Pointer<T> tmp = Head.load(std::memory_order_relaxed);
+    while((nodeptr = tmp.ptr) != NULL) {
+        tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
+        //std::cerr << "delete " << nodeptr << std::endl;
+        delete nodeptr;
+
+    }
+}
+
+template <typename T>
+bool LinkedLockFreeQueue<T>::empty() const
+{
+    return count == 0;
+}
+
+template <typename T>
+bool LinkedLockFreeQueue<T>::full() const
+{
+    return count == qsize;
+}
+
+template <typename T>
+unsigned int LinkedLockFreeQueue<T>::size() const
+{
+    return count;
+}
+
+// Add item to queue
+template <typename T>
+bool LinkedLockFreeQueue<T>::push(const T & item)
+{
+    if (full())
+        return false;
+
+    Node<T> * node = new Node<T>;
+    node->value = item;
+   
+
+    Pointer<T> tail ;
+    Pointer<T> next ;
+   
+
+    while(true) {
+        tail = Tail.load(std::memory_order_relaxed);
+        next = (tail.ptr->next).load(std::memory_order_relaxed);
+        if (tail == Tail.load(std::memory_order_relaxed)) {
+            if (next.ptr == NULL) {
+                if ((tail.ptr->next).compare_exchange_weak(next, 
+                    Pointer<T>(node, next.count+1), 
+                    std::memory_order_release, 
+                    std::memory_order_relaxed) )
+                    break;
+                else
+                    Tail.compare_exchange_weak(tail, 
+                        Pointer<T>(next.ptr, tail.count+1),
+                        std::memory_order_release, 
+                        std::memory_order_relaxed);
+            }
+
+        }
+    }
+
+    Tail.compare_exchange_weak(tail, Pointer<T>(node, tail.count+1), 
+        std::memory_order_release, 
+        std::memory_order_relaxed);  
+    count++;
+    return true;
+}
+
+
+
+
+// Place front item into item variable and remove from queue
+template <typename T>
+bool LinkedLockFreeQueue<T>::pop(T & item)
+{
+    if (empty())
+        return false;
+        
+    Pointer<T> head;
+    Pointer<T> tail;
+    Pointer<T> next;
+
+    while(true) {
+        head = Head.load(std::memory_order_relaxed);
+        tail = Tail.load(std::memory_order_relaxed);
+        next = (head.ptr->next).load();
+        if (head == Head.load(std::memory_order_relaxed)) {
+            if(head.ptr == tail.ptr) {
+                if (next.ptr == NULL)
+                    return false;
+                // Tail is falling behind. Try to advance it
+                Tail.compare_exchange_weak(tail, 
+                        Pointer<T>(next.ptr, tail.count+1),
+                        std::memory_order_release, 
+                        std::memory_order_relaxed);
+            } else {
+                item = next.ptr->value;
+                if (Head.compare_exchange_weak(head, 
+                        Pointer<T>(next.ptr, head.count+1), 
+                        std::memory_order_release, 
+                        std::memory_order_relaxed)) {
+                  delete head.ptr;
+                  break;  
+                }
+
+            }
+        }        
+
+    }
+
+    count--; 
+    return true;
+            
+}
+
+ 
+template <class T>
+T& LinkedLockFreeQueue<T>::operator[](unsigned int i)
+{
+    if (i < 0 || i >= count)
+    {
+        std::cerr << "Error in array limits: " << i << " is out of range\n";
+        std::exit(EXIT_FAILURE);
+    }
+
+
+    Pointer<T> tmp = Head.load(std::memory_order_relaxed);
+    //Pointer<T> tail = Tail.load(std::memory_order_relaxed);
+
+    while(i > 0) {
+        //std::cout << i << ":"  << std::endl;
+        tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
+        i--;    
+    }
+
+    tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
+    return tmp.ptr->value;
+}
+
+
+
+#endif
diff --git a/build/include/lock_free_queue.h b/build/include/lock_free_queue.h
new file mode 100644
index 0000000..f34079f
--- /dev/null
+++ b/build/include/lock_free_queue.h
@@ -0,0 +1,360 @@
+#ifndef __LOCK_FREE_QUEUE_H__
+#define __LOCK_FREE_QUEUE_H__
+
+#include <usg_common.h>
+#include <assert.h> // assert()
+#include "mem_pool.h"
+#include "sem_util.h"
+#include "logger_factory.h"
+#include "shm_allocator.h"
+
+// default Queue size
+#define LOCK_FREE_Q_DEFAULT_SIZE 16
+
+// define this macro if calls to "size" must return the real size of the 
+// queue. If it is undefined  that function will try to take a snapshot of 
+// the queue, but returned value might be bogus
+
+
+// forward declarations for default template values
+//
+
+template <typename ELEM_T, typename Allocator>
+class ArrayLockFreeQueue;
+
+// template <typename ELEM_T>
+// class LinkedLockFreeQueue;
+
+
+/// @brief Lock-free queue based on a circular array
+/// No allocation of extra memory for the nodes handling is needed, but it has 
+/// to add extra overhead (extra CAS operation) when inserting to ensure the 
+/// thread-safety of the queue when the queue type is not 
+/// ArrayLockFreeQueueSingleProducer.
+///
+/// examples of instantiation:
+///   ArrayLockFreeQueue<int> q; // queue of ints of default size (65535 - 1)
+///                              // and defaulted to single producer
+///   ArrayLockFreeQueue<int, 16> q;
+///                              // queue of ints of size (16 - 1) and
+///                              // defaulted to single producer
+///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
+///                              // queue of ints of size (100 - 1) with support
+///                              // for multiple producers
+///
+/// ELEM_T represents the type of elementes pushed and popped from the queue
+/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
+///        This number should be a power of 2 to ensure 
+///        indexes in the circular queue keep stable when the uint32_t 
+///        variable that holds the current position rolls over from FFFFFFFF
+///        to 0. For instance
+///        2    -> 0x02 
+///        4    -> 0x04
+///        8    -> 0x08
+///        16   -> 0x10
+///        (...) 
+///        1024 -> 0x400
+///        2048 -> 0x800
+///
+///        if queue size is not defined as requested, let's say, for
+///        instance 100, when current position is FFFFFFFF (4,294,967,295)
+///        index in the circular array is 4,294,967,295 % 100 = 95. 
+///        When that value is incremented it will be set to 0, that is the 
+///        last 4 elements of the queue are not used when the counter rolls
+///        over to 0
+/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and 
+///        ArrayLockFreeQueue are supported (single producer
+///        by default)
+template <
+    typename ELEM_T, 
+    typename Allocator = SHM_Allocator,
+    template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
+     >
+class LockFreeQueue
+{
+
+private:
+    int slots;
+    int items;
+   
+public:
+    // int mutex;
+    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
+    
+    /// @brief destructor of the class. 
+    /// Note it is not virtual since it is not expected to inherit from this
+    /// template
+    ~LockFreeQueue();
+    std::atomic_uint reference;    
+    /// @brief constructor of the class
+   
+
+    /// @brief returns the current number of items in the queue
+    /// It tries to take a snapshot of the size of the queue, but in busy environments
+    /// this function might return bogus values. 
+    ///
+    /// If a reliable queue size must be kept you might want to have a look at 
+    /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
+    /// it enables a reliable size though it hits overall performance of the queue 
+    /// (when the reliable size variable is on it's got an impact of about 20% in time)
+    inline uint32_t size();
+    
+    /// @brief return true if the queue is full. False otherwise
+    /// It tries to take a snapshot of the size of the queue, but in busy 
+    /// environments this function might return bogus values. See help in method
+    /// LockFreeQueue::size
+    inline bool full();
+
+    inline bool empty();
+
+    inline ELEM_T& operator[](unsigned i);
+
+    /// @brief push an element at the tail of the queue
+    /// @param the element to insert in the queue
+    /// Note that the element is not a pointer or a reference, so if you are using large data
+    /// structures to be inserted in the queue you should think of instantiate the template
+    /// of the queue as a pointer to that large structure
+    /// @return true if the element was inserted in the queue. False if the queue was full
+    bool push(const ELEM_T &a_data);
+    bool push_nowait(const ELEM_T &a_data);
+    bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
+
+    /// @brief pop the element at the head of the queue
+    /// @param a reference where the element in the head of the queue will be saved to
+    /// Note that the a_data parameter might contain rubbish if the function returns false
+    /// @return true if the element was successfully extracted from the queue. False if the queue was empty
+    bool pop(ELEM_T &a_data);
+    bool pop_nowait(ELEM_T &a_data);
+    bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
+
+
+    void *operator new(size_t size);
+    void operator delete(void *p);
+
+protected:
+    /// @brief the actual queue. methods are forwarded into the real 
+    ///        implementation
+    Q_TYPE<ELEM_T, Allocator> m_qImpl;
+
+private:
+    /// @brief disable copy constructor declaring it private
+    LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
+};
+
+
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
+{
+// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
+    slots = SemUtil::get(IPC_PRIVATE, qsize);
+    items = SemUtil::get(IPC_PRIVATE, 0);
+    // mutex = SemUtil::get(IPC_PRIVATE, 1);
+}
+
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
+{
+    LoggerFactory::getLogger().debug("LockFreeQueue desctroy");
+    SemUtil::remove(slots);
+    SemUtil::remove(items);
+}
+
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
+{
+    return m_qImpl.size();
+}  
+
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
+{
+    return m_qImpl.full();
+}
+
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
+{
+    return m_qImpl.empty();
+}  
+
+
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
+{
+    if (SemUtil::dec(slots) == -1) {
+        err_msg(errno, "LockFreeQueue push");
+        return false;
+    }
+
+    if ( m_qImpl.push(a_data) ) {
+        SemUtil::inc(items);      
+        return true;
+    }
+    return false;
+    
+}
+
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
+{
+    if (SemUtil::dec_nowait(slots) == -1) {
+        if (errno == EAGAIN)
+            return false;
+        else {
+            err_msg(errno, "LockFreeQueue push_nowait");
+            return false;
+        }
+
+    }
+
+    if ( m_qImpl.push(a_data)) {
+        SemUtil::inc(items);     
+        return true;
+    }
+    return false;
+    
+}
+
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
+{
+
+    if (SemUtil::dec_timeout(slots, timeout) == -1) {
+        if (errno == EAGAIN)
+            return false;
+        else {
+            err_msg(errno, "LockFreeQueue push_timeout");
+            return false;
+        }
+    }
+
+    if (m_qImpl.push(a_data)){
+        SemUtil::inc(items);       
+        return true;
+    }
+    return false;
+    
+}
+
+
+
+
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
+{
+    if (SemUtil::dec(items) == -1) {
+        err_msg(errno, "LockFreeQueue pop");
+        return false;
+    }
+
+    if (m_qImpl.pop(a_data)) {
+        SemUtil::inc(slots);      
+        return true;
+    }
+    return false;
+    
+}
+
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
+{
+    if (SemUtil::dec_nowait(items) == -1) {
+        if (errno == EAGAIN)
+            return false;
+        else {
+            err_msg(errno, "LockFreeQueue pop_nowait");
+            return false;
+        }
+    }
+
+    if (m_qImpl.pop(a_data)) {
+        SemUtil::inc(slots);     
+        return true;
+    }
+    return false;
+    
+}
+
+ 
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
+{
+    if (SemUtil::dec_timeout(items, timeout) == -1) {
+        if (errno == EAGAIN)
+            return false;
+        else {
+            err_msg(errno, "LockFreeQueue pop_timeout");
+            return false;
+        }
+    }
+
+    if (m_qImpl.pop(a_data)) {
+        SemUtil::inc(slots);       
+        return true;
+    }
+    return false;
+    
+}
+
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
+    return m_qImpl.operator[](i);
+}
+
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
+        return Allocator::allocate(size);
+}
+
+template <
+    typename ELEM_T, 
+    typename Allocator,
+    template <typename T, typename AT> class Q_TYPE>
+void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
+    return Allocator::deallocate(p);
+}
+
+// include implementation files
+//#include "linked_lock_free_queue.h"
+#include "array_lock_free_queue.h"
+
+#endif // _LOCK_FREE_QUEUE_H__
diff --git a/build/include/logger_factory.h b/build/include/logger_factory.h
new file mode 100644
index 0000000..384e3e0
--- /dev/null
+++ b/build/include/logger_factory.h
@@ -0,0 +1,17 @@
+#ifndef __LOGGER_FACTORY_H__
+#define __LOGGER_FACTORY_H__
+#include "logger.h"
+
+class LoggerFactory {
+public:
+
+	static Logger getLogger() {
+//ERROR ALL DEBUG
+		static Logger logger(Logger::DEBUG);
+		return logger;
+	}
+};
+
+#endif
+
+
diff --git a/build/include/mem_pool.h b/build/include/mem_pool.h
new file mode 100644
index 0000000..17a7c5c
--- /dev/null
+++ b/build/include/mem_pool.h
@@ -0,0 +1,55 @@
+#ifndef _MEM_POOL_H_
+#define _MEM_POOL_H_  
+#include "mm.h"
+#include "sem_util.h"
+#define MEM_POOL_COND_KEY 0x8801
+
+static int mem_pool_cond  = SemUtil::get(MEM_POOL_COND_KEY, 0);
+
+// static int mem_pool_mutex  = SemUtil::get(MEM_POOL_COND_KEY, 1);
+
+static inline void mem_pool_init(size_t heap_size) {
+	if(mm_init(heap_size)) {
+		 
+	}
+}
+
+static inline void mem_pool_destroy(void) {
+	if(mm_destroy()) {
+		SemUtil::remove(mem_pool_cond);
+	}
+	
+}
+
+static inline void *mem_pool_malloc (size_t size) {
+	void *ptr;
+	while( (ptr = mm_malloc(size)) == NULL ) {
+		err_msg(0, "There is not enough memery to allocate, waiting someone else to free.");
+		SemUtil::set(mem_pool_cond, 0);
+		// wait for someone else to free space
+		SemUtil::dec(mem_pool_cond);
+
+	}
+	
+	return ptr;
+}
+
+static inline void mem_pool_free (void *ptr) {
+	mm_free(ptr);
+	// notify malloc
+	SemUtil::set(mem_pool_cond, 1);
+
+}
+
+static inline void *mem_pool_realloc (void *ptr, size_t size) {
+	return mm_realloc(ptr, size);
+}
+
+static inline hashtable_t * mem_pool_get_hashtable() {
+	return mm_get_hashtable();
+
+}
+// extern int mm_checkheap(int verbose);
+
+
+#endif
\ No newline at end of file
diff --git a/build/include/mm.h b/build/include/mm.h
new file mode 100644
index 0000000..f0ab764
--- /dev/null
+++ b/build/include/mm.h
@@ -0,0 +1,20 @@
+#ifndef MM_HDR_H
+#define MM_HDR_H      /* Prevent accidental double inclusion */
+
+#include <usg_common.h>
+#include "usg_typedef.h"
+#include "hashtable.h"
+
+extern bool mm_init(size_t heap_size);
+extern bool mm_destroy(void);
+
+extern void *mm_malloc (size_t size);
+extern void mm_free (void *ptr);
+extern void *mm_realloc(void *ptr, size_t size);
+extern hashtable_t * mm_get_hashtable();
+
+// extern int mm_checkheap(int verbose);
+
+// extern void *get_mm_start_brk();
+// extern size_t get_mm_max_size();
+#endif
diff --git a/build/include/mod_socket.h b/build/include/mod_socket.h
new file mode 100644
index 0000000..21498ee
--- /dev/null
+++ b/build/include/mod_socket.h
@@ -0,0 +1,76 @@
+#ifndef __MOD_SOCKET_H__
+#define __MOD_SOCKET_H__
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+enum socket_mod_t
+{
+	PULL_PUSH = 1,
+	REQ_REP = 2,
+	PAIR = 3,
+	PUB_SUB = 4,
+	SURVEY = 5,
+	BUS = 6
+	
+};
+
+/**
+ * 鍒涘缓socket
+ * @return socket鍦板潃
+*/
+void *mod_open_socket(int mod);
+
+/**
+ * 鍏抽棴socket
+*/
+int mod_close_socket(void * _socket);
+
+/**
+ * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓�
+ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
+*/
+int mod_socket_bind(void * _socket, int port);
+ 
+
+/**
+ * 鏈嶅姟绔紑鍚繛鎺ョ洃鍚�
+ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
+ */
+int mod_listen(void * _socket);
+
+/**
+ * 瀹㈡埛绔彂璧疯繛鎺ヨ姹�
+ */
+int mod_connect(void * _socket, int port);
+
+/**
+ * 鍙戦�佷俊鎭�
+ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
+ */
+int mod_send(void * _socket, const void *buf, const int size);
+
+/**
+ * 鎺ユ敹淇℃伅
+ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
+*/
+int mod_recv(void * _socket, void **buf, int *size) ;
+
+/**
+ * 閲婃斁鎺ユ敹淇℃伅鐨刡uf
+ */
+void mod_free(void *buf);
+
+
+/**
+ * 鑾峰彇soket绔彛鍙�
+ */
+int mod_get_socket_port(void * _socket);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
\ No newline at end of file
diff --git a/src/util/sem_util.h b/build/include/sem_util.h
similarity index 100%
copy from src/util/sem_util.h
copy to build/include/sem_util.h
diff --git a/build/include/shm_allocator.h b/build/include/shm_allocator.h
new file mode 100644
index 0000000..023bc9d
--- /dev/null
+++ b/build/include/shm_allocator.h
@@ -0,0 +1,102 @@
+#ifndef __SHM_ALLOCATOR_H__
+#define __SHM_ALLOCATOR_H__
+#include "usg_common.h"
+#include "mem_pool.h"
+#include <new>
+#include <cstdlib> // for exit()
+#include <climits> // for UNIX_MAX
+#include <cstddef>
+
+
+
+template<class T> class SHM_STL_Allocator
+{
+public:
+  typedef T               value_type;
+  typedef T*              pointer;
+  typedef const T*        const_pointer;
+  typedef T&              reference;
+  typedef const T&        const_reference;
+  typedef size_t          size_type;
+  typedef ptrdiff_t       difference_type;
+
+
+  SHM_STL_Allocator() {};
+  ~SHM_STL_Allocator() {};
+  template<class U> SHM_STL_Allocator(const SHM_STL_Allocator<U>& t) { };
+  template<class U> struct rebind { typedef SHM_STL_Allocator<U> other; };
+
+  pointer allocate(size_type n, const void* hint=0) {
+//        fprintf(stderr, "allocate n=%u, hint= %p\n",n, hint);
+    return((T*) (mm_malloc(n * sizeof(T))));
+  }
+
+  void deallocate(pointer p, size_type n) {
+//        fprintf(stderr, "dealocate n=%u" ,n);
+    mm_free((void*)p);
+  }
+
+  void construct(pointer p, const T& value) {
+    ::new(p) T(value);
+  }
+
+  void construct(pointer p)
+  {
+    ::new(p) T();
+  }
+
+  void destroy(pointer p) {
+    p->~T();
+  }
+
+  pointer address(reference x) {
+    return (pointer)&x;
+  }
+
+  const_pointer address(const_reference x) {
+    return (const_pointer)&x;
+  }
+
+  size_type max_size() const {
+    return size_type(UINT_MAX/sizeof(T));
+  }
+};
+
+
+class SHM_Allocator {
+  public:
+    static void *allocate (size_t size) {
+       printf("shm_allocator malloc\n");
+      return mem_pool_malloc(size);
+    }
+
+    static void deallocate (void *ptr) {
+      printf("shm_allocator free\n");
+      return mem_pool_free(ptr);
+    }
+};
+
+
+class DM_Allocator {
+  public:
+    static void *allocate (size_t size) {
+      printf("dm_allocator malloc\n");
+      return malloc(size);
+    }
+
+    static void deallocate (void *ptr) {
+      printf("dm_allocator free\n");
+      return free(ptr);
+    }
+};
+
+
+// template<class charT, class traits = char _traits<charT>,
+// class Allocator = allocator<charT> >  
+
+
+
+
+typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > shmstring;
+
+#endif
\ No newline at end of file
diff --git a/build/include/shm_mm.h b/build/include/shm_mm.h
new file mode 100644
index 0000000..b32568e
--- /dev/null
+++ b/build/include/shm_mm.h
@@ -0,0 +1,26 @@
+#ifndef __SHM_MM_H__
+#define __SHM_MM_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+	
+/**
+ * 鍒濆鍖栧叡浜唴瀛�
+ * @size 鍏变韩鍐呭瓨澶у皬, 鍗曚綅M
+ * 
+ */
+void shm_init(int size);
+
+/**
+ * 閿�姣佸叡浜唴瀛�
+ * 鏁翠釜杩涚▼閫�鍑烘椂闇�瑕佹墽琛岃繖涓柟娉曪紝璇ユ柟娉曢鍏堜細妫�鏌ユ槸鍚﹁繕鏈夊叾浠栬繘绋嬪湪浣跨敤璇ュ叡浜唴瀛橈紝濡傛灉杩樻湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ氨鍙槸detach,濡傛灉娌℃湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ垯閿�姣佹暣鍧楀唴瀛樸��
+ */
+void shm_destroy();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
diff --git a/build/include/shm_queue.h b/build/include/shm_queue.h
new file mode 100644
index 0000000..d853774
--- /dev/null
+++ b/build/include/shm_queue.h
@@ -0,0 +1,184 @@
+#ifndef __SHM_QUEUE_H__
+#define __SHM_QUEUE_H__
+
+#include "usg_common.h"
+#include "hashtable.h"
+#include "lock_free_queue.h"
+#include "logger_factory.h"
+#include "shm_allocator.h"
+
+// default Queue size
+// #define LOCK_FREE_Q_DEFAULT_SIZE 16
+ 
+template < typename ELEM_T>
+class SHMQueue
+{
+
+private:
+    const int KEY;
+ 
+public:
+    /// @brief constructor of the class
+    SHMQueue(int key=0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
+    
+    
+    ~SHMQueue();
+
+   
+    inline uint32_t size();
+ 
+    inline bool full();
+    inline bool empty();
+   
+    inline bool push(const ELEM_T &a_data);
+    inline bool push_nowait(const ELEM_T &a_data);
+    inline bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
+    inline bool pop(ELEM_T &a_data);
+    inline bool pop_nowait(ELEM_T &a_data);
+    inline bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
+
+    inline ELEM_T& operator[](unsigned i);
+
+    static void remove_queues_exclude(int *keys, size_t length);
+private:
+
+
+protected:
+    /// @brief the actual queue-> methods are forwarded into the real 
+    ///        implementation
+    LockFreeQueue<ELEM_T, SHM_Allocator>* queue;
+
+private:
+    /// @brief disable copy constructor declaring it private
+    SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src);
+};
+
+
+template < typename ELEM_T >
+void SHMQueue<ELEM_T>::remove_queues_exclude(int *keys, size_t length)
+{
+    hashtable_t *hashtable = mm_get_hashtable();
+    std::set<int>* keyset = hashtable_keyset(hashtable);
+    std::set<int>::iterator keyItr;
+     LockFreeQueue<ELEM_T, SHM_Allocator>* mqueue;
+    bool found;
+    for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
+        found = false;
+        for(size_t i = 0; i < length; i++) {
+            if(*keyItr == keys[i]) {
+                found = true;
+                break;
+            }
+        }
+        if(!found) {
+           mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
+           delete mqueue;
+        }
+    }
+    delete keyset;
+    
+}  
+
+template < typename ELEM_T >
+SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize): KEY(key)
+{
+
+    hashtable_t *hashtable = mm_get_hashtable();
+    queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
+    //LockFreeQueue<int, 10000> q;
+    if (queue == NULL || (void *)queue == (void *)1) {
+        queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
+        hashtable_put(hashtable,  key, (void *)queue);
+    }
+    queue->reference++;
+    LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load());
+}
+
+template < typename ELEM_T >
+SHMQueue<ELEM_T>::~SHMQueue()
+{
+    queue->reference--;
+    LoggerFactory::getLogger().debug("SHMQueue destructor  reference===%d", queue->reference.load());
+    if(queue->reference.load() == 0) {
+        delete queue;
+        hashtable_t *hashtable = mm_get_hashtable();
+        hashtable_remove(hashtable, KEY);
+        LoggerFactory::getLogger().debug("SHMQueue destructor delete queue");
+    }
+}
+
+template < typename ELEM_T >
+inline uint32_t SHMQueue<ELEM_T>::size()
+{
+    return queue->size();
+}  
+
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::full()
+{
+    return queue->full();
+}
+
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::empty()
+{
+    return queue->empty();
+}  
+
+
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::push(const ELEM_T &a_data)
+{
+   return queue->push(a_data);
+    
+}
+
+template <
+    typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data)
+{
+   return queue->push_nowait(a_data);
+    
+}
+
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
+{
+
+    return queue->push_timeout(a_data, timeout);
+    
+}
+
+
+
+
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data)
+{
+   return queue->pop(a_data);
+    
+}
+
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data)
+{
+    return queue->pop_nowait(a_data);
+    
+}
+
+ 
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
+{
+   return queue->pop_timeout(a_data, timeout);
+    
+}
+
+template < typename ELEM_T >
+inline ELEM_T& SHMQueue<ELEM_T>::operator[](unsigned i) {
+     return queue->operator[](i);
+}
+
+
+
+#endif
diff --git a/build/include/shm_queue_wrapper.h b/build/include/shm_queue_wrapper.h
new file mode 100644
index 0000000..984bd5a
--- /dev/null
+++ b/build/include/shm_queue_wrapper.h
@@ -0,0 +1,100 @@
+#ifndef __SHM_QUEUE_WRAPPER_H__
+#define __SHM_QUEUE_WRAPPER_H__
+
+#include "usg_common.h"
+#include "usg_typedef.h"
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+ 
+
+//绉婚櫎涓嶅寘鍚湪keys涓殑闃熷垪
+void shm_remove_queues_exclude(void *keys, int length);
+/**
+ * 鍒涘缓闃熷垪
+ * @ shmqueue 
+ * @ key 鏍囪瘑鍏变韩闃熷垪鐨勫敮涓�鏍囪瘑, key鏄竴涓寚閽堥噷闈㈠瓨鍌ㄤ簡key鐨勫�硷紝 濡傛灉key鐨勫�间负-1绯荤粺浼氳嚜鍔ㄥ垎閰嶄竴涓猭ey鍊煎苟鎶婅key鐨勫�艰祴缁檏ey鎸囬拡銆傚鏋渒ey鐨勫�间笉浼氱┖浼氭鏌ユ槸鍚︽湁閲嶅缁戝畾鐨勬儏鍐�, 鏈夐噸澶嶅氨鎶ラ敊娌℃湁灏卞垱寤洪槦鍒楀苟缁戝畾key.
+ * @ queue_size 闃熷垪澶у皬
+ */
+void* shmqueue_create( int * key, int queue_size);
+
+/**
+ * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑�
+ */
+void* shmqueue_attach(int key) ;
+
+/**
+ * 閿�姣�
+*/
+void shmqueue_drop(void * _shmqueue);
+
+/**
+ * 闃熷垪鍏冪礌鐨勪釜鏁�
+ */
+int shmqueue_size(void * _shmqueue) ;
+
+/**
+ * 鏄惁宸叉弧
+ * @return 1鏄紝 0鍚�
+ */
+int shmqueue_full(void * _shmqueue);
+
+/**
+ * 鏄惁涓虹┖
+ * @return 1鏄紝 0鍚�
+ */
+int shmqueue_empty(void * _shmqueue) ;
+
+/**
+ * 鍏ラ槦, 闃熷垪婊℃椂绛夊緟.
+ * @return 1 鍏ラ槦鎴愬姛, 0 鍏ラ槦澶辫触
+ */
+int shmqueue_push(void * _shmqueue, void *src, int size);
+
+/**
+ * 鍏ラ槦, 闃熷垪婊℃椂绔嬪嵆杩斿洖.
+ * @return 1 鍏ラ槦鎴愬姛, 0 鍏ラ槦澶辫触
+ */
+int shmqueue_push_nowait(void * _shmqueue, void *src, int size) ;
+
+/**
+ * 鍏ラ槦, 鎸囧畾鏃堕棿鍐呭叆闃熶笉鎴愬姛灏辫繑鍥�
+ * @sec 绉�
+ * @nsec 绾崇
+ * @return 1 鍏ラ槦鎴愬姛, 0 鍏ラ槦澶辫触
+ */
+int shmqueue_push_timeout(void * _shmqueue, void *src, int size,  int sec, int nsec) ;
+
+/**
+ * 鍑洪槦, 闃熷垪绌烘椂绛夊緟
+ * @return 1 鍑洪槦鎴愬姛锛� 0鍑洪槦澶辫触
+ */
+int shmqueue_pop(void * _shmqueue, void **dest, int *size);
+
+/**
+ * 鍑洪槦, 闃熷垪绌烘椂绔嬪嵆杩斿洖
+ * @return 1 鍑洪槦鎴愬姛锛� 0鍑洪槦澶辫触
+ */
+int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) ;
+
+/**
+ * 鍑洪槦, 鎸囧畾鏃堕棿鍐呭嚭闃熶笉鎴愬姛灏辫繑鍥�
+ * @sec绉�
+ * @nsec绾崇
+ * @return 1 鍑洪槦鎴愬姛锛� 0鍑洪槦澶辫触
+ */
+int shmqueue_pop_timeout(void * _shmqueue, void **dest, int *size, int sec, int nsec);
+
+/**
+ * 閲婃斁鍑洪槦鍒嗛厤鐨勫唴瀛�
+ */
+void shmqueue_free(void *ptr);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
\ No newline at end of file
diff --git a/build/include/shm_socket.h b/build/include/shm_socket.h
new file mode 100644
index 0000000..30b85da
--- /dev/null
+++ b/build/include/shm_socket.h
@@ -0,0 +1,87 @@
+#ifndef __SHM_SOCKET_H__
+#define __SHM_SOCKET_H__
+
+#include "usg_common.h"
+#include "usg_typedef.h"
+#include "shm_queue.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+enum shm_msg_type_t
+{
+	SHM_SOCKET_OPEN = 1,
+	SHM_SOCKET_OPEN_REPLY = 2,
+	SHM_SOCKET_CLOSE = 3,
+	SHM_COMMON_MSG = 4
+	
+};
+
+enum shm_socket_type_t
+{
+	SHM_SOCKET_STREAM = 1,
+	SHM_SOCKET_DGRAM = 2
+	
+};
+
+enum shm_connection_status_t {
+	SHM_CONN_CLOSED=1,
+	SHM_CONN_LISTEN=2,
+	SHM_CONN_ESTABLISHED=3
+};
+
+typedef struct shm_msg_t {
+	int port;
+	shm_msg_type_t type;
+	size_t size;
+	void * buf;
+
+} shm_msg_t;
+
+
+typedef struct shm_socket_t {
+	shm_socket_type_t socket_type;
+	// 鏈湴port
+	int port;
+	shm_connection_status_t status;
+	SHMQueue<shm_msg_t> *queue;
+	SHMQueue<shm_msg_t> *remoteQueue;
+	LockFreeQueue<shm_msg_t, DM_Allocator> *messageQueue;
+	LockFreeQueue<shm_msg_t, DM_Allocator> *acceptQueue;
+	std::map<int, shm_socket_t* > *clientSocketMap;
+	pthread_t dispatch_thread;
+
+} shm_socket_t;
+
+
+
+
+shm_socket_t *shm_open_socket(shm_socket_type_t socket_type);
+
+
+int shm_close_socket(shm_socket_t * socket) ;
+
+
+int shm_socket_bind(shm_socket_t * socket, int port) ;
+
+int shm_listen(shm_socket_t * socket) ;
+
+shm_socket_t* shm_accept(shm_socket_t* socket);
+
+int shm_connect(shm_socket_t * socket, int port);
+
+int shm_send(shm_socket_t * socket, const void *buf, const int size) ;
+
+int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
+
+int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port);
+
+int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
\ No newline at end of file
diff --git a/build/lib/libshm_queue.a b/build/lib/libshm_queue.a
new file mode 100644
index 0000000..4e9e1cb
--- /dev/null
+++ b/build/lib/libshm_queue.a
Binary files differ
diff --git a/demo/Makefile b/demo/Makefile
index a80c730..f7c6491 100644
--- a/demo/Makefile
+++ b/demo/Makefile
@@ -2,19 +2,19 @@
 # Makefile for common library.
 #
 ROOT=..
-LDLIBS+=-Wl,-rpath=$(ROOT)/queue:$(ROOT)/lib
+LDLIBS+=-Wl,-rpath=$(ROOT)/lib:$(ROOT)/build/lib
 # 寮�婧愬伐鍏峰寘璺緞
-LDDIR += -L$(ROOT)/queue
+LDDIR += -L$(ROOT)/build/lib
 # 寮�婧愬伐鍏峰寘
 LDLIBS += -lshm_queue -lusgcommon -lpthread
 
-INCLUDE += -I$(ROOT)/queue/ -I$(ROOT)/queue/include
+INCLUDE += -I$(ROOT)/build/include
 
 PLATFORM=$(shell $(ROOT)/systype.sh)
 include $(ROOT)/Make.defines.$(PLATFORM)
 
 
-PROGS =	req_rep pub_sub
+PROGS =	req_rep pub_sub queue
 
 
 build: $(PROGS)
@@ -25,8 +25,3 @@
   
 clean:
 	rm -f $(TEMPFILES) $(PROGS)
-
-
-
-$(LIBQUEUE):
-	(cd $(ROOT)/queue && $(MAKE))
\ No newline at end of file
diff --git a/demo/pub_sub b/demo/pub_sub
new file mode 100755
index 0000000..265621f
--- /dev/null
+++ b/demo/pub_sub
Binary files differ
diff --git a/demo/queue b/demo/queue
new file mode 100755
index 0000000..b1e1056
--- /dev/null
+++ b/demo/queue
Binary files differ
diff --git a/test2/test_queue_wrapper.c b/demo/queue.c
similarity index 92%
rename from test2/test_queue_wrapper.c
rename to demo/queue.c
index 6483fb3..265b574 100644
--- a/test2/test_queue_wrapper.c
+++ b/demo/queue.c
@@ -1,5 +1,5 @@
 #include "shm_queue_wrapper.h"
-#include "mm.h"
+#include "shm_mm.h"
 
 // typedef struct message_t
 // {
@@ -22,7 +22,7 @@
 	for(i = 0; i < qsize; i++) {
 		sprintf(msg, "%d hello", i); 
 		//鍏ラ槦
-		if(shmqueue_push(queue, (void *)msg, sizeof(msg))) {
+		if(shmqueue_push(queue, (void *)msg, strlen(msg) + 1)) {
 			  printf("push: %s\n", msg );
 		}
 	}
diff --git a/demo/req_rep b/demo/req_rep
new file mode 100755
index 0000000..3d35107
--- /dev/null
+++ b/demo/req_rep
Binary files differ
diff --git a/src/Makefile b/src/Makefile
index 2ab0bb5..9f399e1 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -23,14 +23,16 @@
 
 MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE)
 
+PREFIX = $(ROOT)/build
+
 ifeq ($(DEBUG),y)
   MYLIBS = $(LIBSQUEUE)
 else
   MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE)
 endif
 
-all: build
- 
+all: install
+
 
 build: $(MYLIBS)
 
@@ -55,7 +57,7 @@
 	install -d $(PREFIX)/lib/
 	install -m 644 $^ $(PREFIX)/lib/
 	install -d $(PREFIX)/include/
-	install -m 644 $(MINCLUDE)/* $(PREFIX)/include/
+	install -m 644 ./*.h ./queue/include/* ./socket/include/* ./util/include/* $(PREFIX)/include/
 
 clean:
 	rm -f $(TEMPFILES)
diff --git a/src/libshm_queue.a b/src/libshm_queue.a
index 1406c9b..4e9e1cb 100644
--- a/src/libshm_queue.a
+++ b/src/libshm_queue.a
Binary files differ
diff --git a/src/logger_factory.h b/src/logger_factory.h
index e738ad9..384e3e0 100644
--- a/src/logger_factory.h
+++ b/src/logger_factory.h
@@ -6,8 +6,8 @@
 public:
 
 	static Logger getLogger() {
-//ERROR ALL
-		static Logger logger(Logger::ERROR);
+//ERROR ALL DEBUG
+		static Logger logger(Logger::DEBUG);
 		return logger;
 	}
 };
diff --git a/src/queue/include/shm_allocator.h b/src/queue/include/shm_allocator.h
index 6d9dcc6..023bc9d 100644
--- a/src/queue/include/shm_allocator.h
+++ b/src/queue/include/shm_allocator.h
@@ -1,7 +1,7 @@
 #ifndef __SHM_ALLOCATOR_H__
 #define __SHM_ALLOCATOR_H__
 #include "usg_common.h"
-#include "mm.h"
+#include "mem_pool.h"
 #include <new>
 #include <cstdlib> // for exit()
 #include <climits> // for UNIX_MAX
@@ -67,12 +67,12 @@
   public:
     static void *allocate (size_t size) {
        printf("shm_allocator malloc\n");
-      return mm_malloc(size);
+      return mem_pool_malloc(size);
     }
 
     static void deallocate (void *ptr) {
       printf("shm_allocator free\n");
-      return mm_free(ptr);
+      return mem_pool_free(ptr);
     }
 };
 
diff --git a/src/queue/include/shm_queue.h b/src/queue/include/shm_queue.h
index 394545b..d853774 100644
--- a/src/queue/include/shm_queue.h
+++ b/src/queue/include/shm_queue.h
@@ -2,7 +2,6 @@
 #define __SHM_QUEUE_H__
 
 #include "usg_common.h"
-#include "mm.h"
 #include "hashtable.h"
 #include "lock_free_queue.h"
 #include "logger_factory.h"
diff --git a/src/queue/include/shm_queue_wrapper.h b/src/queue/include/shm_queue_wrapper.h
index 51b73f5..984bd5a 100644
--- a/src/queue/include/shm_queue_wrapper.h
+++ b/src/queue/include/shm_queue_wrapper.h
@@ -3,8 +3,7 @@
 
 #include "usg_common.h"
 #include "usg_typedef.h"
-#include "shm_queue.h"
-#include "shm_allocator.h"
+
 
 #ifdef __cplusplus
 extern "C" {
diff --git a/src/queue/libshm_queue.a b/src/queue/libshm_queue.a
deleted file mode 100644
index 1b5c93b..0000000
--- a/src/queue/libshm_queue.a
+++ /dev/null
Binary files differ
diff --git a/src/queue/libshm_queue.so b/src/queue/libshm_queue.so
deleted file mode 100755
index 598c829..0000000
--- a/src/queue/libshm_queue.so
+++ /dev/null
Binary files differ
diff --git a/src/queue/shm_queue_wrapper.c b/src/queue/shm_queue_wrapper.c
index 98ec273..29a8f64 100644
--- a/src/queue/shm_queue_wrapper.c
+++ b/src/queue/shm_queue_wrapper.c
@@ -2,6 +2,8 @@
 
 #include "mem_pool.h"
 #include "hashtable.h"
+#include "shm_queue.h"
+#include "shm_allocator.h"
 
 typedef struct ele_t {
 	size_t size;
diff --git a/src/socket/include/shm_socket.h b/src/socket/include/shm_socket.h
index 8b14f06..30b85da 100644
--- a/src/socket/include/shm_socket.h
+++ b/src/socket/include/shm_socket.h
@@ -4,11 +4,6 @@
 #include "usg_common.h"
 #include "usg_typedef.h"
 #include "shm_queue.h"
-#include "shm_allocator.h"
-
-#include "mem_pool.h"
-#include "hashtable.h"
-#include "sem_util.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -20,6 +15,13 @@
 	SHM_SOCKET_OPEN_REPLY = 2,
 	SHM_SOCKET_CLOSE = 3,
 	SHM_COMMON_MSG = 4
+	
+};
+
+enum shm_socket_type_t
+{
+	SHM_SOCKET_STREAM = 1,
+	SHM_SOCKET_DGRAM = 2
 	
 };
 
@@ -39,6 +41,7 @@
 
 
 typedef struct shm_socket_t {
+	shm_socket_type_t socket_type;
 	// 鏈湴port
 	int port;
 	shm_connection_status_t status;
@@ -54,7 +57,7 @@
 
 
 
-shm_socket_t *shm_open_socket();
+shm_socket_t *shm_open_socket(shm_socket_type_t socket_type);
 
 
 int shm_close_socket(shm_socket_t * socket) ;
@@ -72,6 +75,9 @@
 
 int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
 
+int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port);
+
+int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port);
 
 
 #ifdef __cplusplus
diff --git a/src/socket/mod_socket.c b/src/socket/mod_socket.c
index cc358f6..fcb5e58 100644
--- a/src/socket/mod_socket.c
+++ b/src/socket/mod_socket.c
@@ -1,7 +1,12 @@
+#include "usg_common.h"
 #include "mod_socket.h"
 #include "shm_socket.h"
-#include "usg_common.h"
+#include "shm_allocator.h"
+#include "mem_pool.h"
+#include "hashtable.h"
+#include "sem_util.h"
 #include "logger_factory.h"
+
 static Logger logger = LoggerFactory::getLogger();
 
 typedef struct mod_entry_t
@@ -28,7 +33,7 @@
  */
 void *mod_open_socket(int mod) {
   mod_socket_t *socket = (mod_socket_t *)malloc(sizeof(mod_socket_t));
-  socket->shm_socket=shm_open_socket();
+  socket->shm_socket=shm_open_socket(SHM_SOCKET_STREAM);
   socket->is_server = 0;
   socket->mod = (socket_mod_t)mod;
   socket->recvQueue = new LockFreeQueue<mod_entry_t, DM_Allocator>(16);
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index 4fb90cf..260cdc2 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -1,4 +1,5 @@
 #include "shm_socket.h"
+#include "hashtable.h"
 #include "logger_factory.h"
 #include <map>
 
@@ -14,11 +15,16 @@
 
 void * _client_run_msg_rev(void* _socket);
 
+int _shm_close_dgram_socket(shm_socket_t *socket);
+
+
+int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote);
+
 SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ;
 
-shm_socket_t *shm_open_socket() {
+shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
 	shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
-	
+	socket->socket_type = socket_type;
 	socket->port = -1;
 	socket->dispatch_thread = 0;
 	socket->status=SHM_CONN_CLOSED;
@@ -26,8 +32,381 @@
 	return socket;
 }
 
+int shm_close_socket(shm_socket_t *socket) {
+	switch(socket->socket_type) {
+		case 	SHM_SOCKET_STREAM:
+			return _shm_close_stream_socket(socket, true);
+		case SHM_SOCKET_DGRAM:
+			return _shm_close_dgram_socket(socket);
+		default:
+			return -1;
+	}
+	return -1;
+	
+}
 
-int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) {
+int shm_socket_bind(shm_socket_t * socket, int port) {
+	socket -> port = port;
+	return 0;
+}
+
+int shm_listen(shm_socket_t* socket) {
+
+	if(socket->socket_type != SHM_SOCKET_STREAM) {
+		err_exit(0, "can not invoke shm_listen method with a socket which is not a SHM_SOCKET_STREAM socket");
+	}
+
+	int  port;
+	hashtable_t *hashtable = mm_get_hashtable();
+	if(socket -> port == -1) {
+		port = hashtable_alloc_key(hashtable);
+		socket -> port = port;
+	} else {
+		 
+		if(hashtable_get(hashtable, socket->port)!= NULL) {
+			err_exit(0, "key %d has already been in used!", socket->port);
+		}
+	}
+
+	socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+	socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+	socket->clientSocketMap = new std::map<int, shm_socket_t* >;
+	socket->status = SHM_CONN_LISTEN;
+	pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket);
+
+	
+	return 0;
+}
+
+
+/**
+ * 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹�
+ *
+*/
+shm_socket_t* shm_accept(shm_socket_t* socket) {
+	if(socket->socket_type != SHM_SOCKET_STREAM) {
+		err_exit(0, "can not invoke shm_accept method with a socket which is not a SHM_SOCKET_STREAM socket");
+	}
+	hashtable_t *hashtable = mm_get_hashtable();
+	int client_port;
+	shm_socket_t *client_socket;
+	shm_msg_t src;
+
+	if (socket->acceptQueue->pop(src) ) {
+
+// print_msg("===accept:", src);
+		client_port = src.port;
+		client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
+		client_socket->port = socket->port;
+		// client_socket->queue= socket->queue;
+		//鍒濆鍖栨秷鎭痲ueue
+		client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); 
+		//杩炴帴鍒板鏂筿ueue
+		client_socket->remoteQueue = _attach_remote_queue(client_port);
+
+		socket->clientSocketMap->insert({client_port, client_socket});
+
+		/*
+         * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰�
+		*/
+		//鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰
+		struct timespec timeout = {1, 0};
+		shm_msg_t msg;
+		msg.port = socket->port;
+		msg.size = 0;
+		msg.type = SHM_SOCKET_OPEN_REPLY;
+
+		if (client_socket->remoteQueue->push_timeout(msg, &timeout) )
+		{
+			client_socket->status = SHM_CONN_ESTABLISHED;
+			return client_socket;
+		} else {
+			err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触");
+			return NULL;
+		}
+
+		
+	} else {
+		err_exit(errno, "shm_accept");
+	}
+	return NULL;
+	
+}
+
+
+int shm_connect(shm_socket_t* socket, int port) {
+	if(socket->socket_type != SHM_SOCKET_STREAM) {
+		err_exit(0, "can not invoke shm_connect method with a socket which is not a SHM_SOCKET_STREAM socket");
+	}
+	hashtable_t *hashtable = mm_get_hashtable();
+	if(hashtable_get(hashtable, port)== NULL) {
+		err_exit(0, "shm_connect锛歝onnect at port %d  failed!", port);
+	}
+
+	if(socket->port == -1) {
+		socket->port = hashtable_alloc_key(hashtable);
+	} else {
+		 
+		if(hashtable_get(hashtable, socket->port)!= NULL) {
+			err_exit(0, "key %d has already been in used!", socket->port);
+		}
+	}
+
+	socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+	socket->remoteQueue = _attach_remote_queue(port);
+	socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); 
+	
+
+	//鍙戦�乷pen璇锋眰
+	struct timespec timeout = {1, 0};
+	shm_msg_t msg;
+	msg.port = socket->port;
+	msg.size = 0;
+	msg.type=SHM_SOCKET_OPEN;
+	socket->remoteQueue->push_timeout(msg, &timeout);
+
+	//鎺ュ彈open reply
+	if(socket->queue->pop(msg)) {
+		// 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺�
+		if(msg.type == SHM_SOCKET_OPEN_REPLY) {
+			socket->status = SHM_CONN_ESTABLISHED;
+			pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
+		} else {
+			err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!");
+		}
+		
+	} else {
+		err_exit(0, "connect failted!");
+	}
+	
+	return 0;
+}
+
+
+int shm_send(shm_socket_t *socket, const void *buf, const int size) {
+	if(socket->socket_type != SHM_SOCKET_STREAM) {
+		err_exit(0, "can not invoke shm_send method with a socket which is not a SHM_SOCKET_STREAM socket");
+	}
+	// hashtable_t *hashtable = mm_get_hashtable();
+	// if(socket->remoteQueue == NULL) {
+	// 	err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!");
+	// 	return -1;
+	// }
+	shm_msg_t dest;
+	dest.type=SHM_COMMON_MSG;
+	dest.port = socket->port;
+	dest.size = size;
+	dest.buf = mm_malloc(size);
+	memcpy(dest.buf, buf, size);
+
+ 
+	if(socket->remoteQueue->push(dest)) {
+		return 0;
+	} else {
+		err_msg(errno, "connection has been closed!");
+		return -1;
+	}
+}
+
+
+int shm_recv(shm_socket_t* socket, void **buf, int *size) {
+	if(socket->socket_type != SHM_SOCKET_STREAM) {
+		err_exit(0, "can not invoke shm_recv method with a socket which is not a SHM_SOCKET_STREAM socket");
+	}
+	shm_msg_t src;
+
+	if (socket->messageQueue->pop(src)) {
+		void * _buf = malloc(src.size);
+		memcpy(_buf, src.buf, src.size);
+		*buf = _buf;
+		*size = src.size;
+		mm_free(src.buf);
+		return 0;
+	} else {
+		return -1;
+	}
+	
+}
+
+
+
+// 鐭繛鎺ユ柟寮忓彂閫�
+int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port) {
+	hashtable_t *hashtable = mm_get_hashtable();
+
+	if(socket->queue == NULL) {
+		if(socket->port == -1) {
+			socket->port = hashtable_alloc_key(hashtable);
+		} else {
+			 
+			if(hashtable_get(hashtable, socket->port)!= NULL) {
+				err_exit(0, "key %d has already been in used!", socket->port);
+			}
+		}
+
+		socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+	}
+	if (port == socket->port) {
+		err_msg(0, "can not send to your self!");
+		return -1;
+	}
+
+	shm_msg_t dest;
+	dest.type=SHM_COMMON_MSG;
+	dest.port = socket->port;
+	dest.size = size;
+	dest.buf = mm_malloc(size);
+	memcpy(dest.buf, buf, size);
+
+	SHMQueue<shm_msg_t> *remoteQueue =  _attach_remote_queue(port);
+	if(remoteQueue->push(dest)) {
+		delete remoteQueue;
+		return 0;
+	} else {
+		delete remoteQueue;
+		err_msg(errno, "sendto port %d failed!", port);
+		return -1;
+	}
+}
+
+
+// 鐭繛鎺ユ柟寮忔帴鍙�
+int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port){
+	hashtable_t *hashtable = mm_get_hashtable();
+	if(socket->queue == NULL) {
+		if(socket->port == -1) {
+			socket->port = hashtable_alloc_key(hashtable);
+		} else {
+			 
+			if(hashtable_get(hashtable, socket->port)!= NULL) {
+				err_exit(0, "key %d has already been in used!", socket->port);
+			}
+		}
+
+		socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+	}
+
+	shm_msg_t src;
+//logger.debug("shm_recvfrom pop before");
+	if (socket->queue->pop(src)) {
+		void * _buf = malloc(src.size);
+		memcpy(_buf, src.buf, src.size);
+		*buf = _buf;
+		*size = src.size;
+		*port = src.port;
+		mm_free(src.buf);
+//logger.debug("shm_recvfrom pop after");
+		return 0;
+	} else {
+		return -1;
+	}
+}
+
+
+/**
+ * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑�
+ */
+SHMQueue<shm_msg_t> * _attach_remote_queue(int port) {
+	hashtable_t *hashtable = mm_get_hashtable();
+	if(hashtable_get(hashtable, port)== NULL) {
+		err_exit(0, "_remote_queue_attach锛歝onnet at port %d  failed!", port);
+		return NULL;
+	}
+	 
+	SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0);
+	return queue;
+}
+
+
+
+
+
+void _server_close_conn_to_client(shm_socket_t* socket, int port) {
+	shm_socket_t *client_socket;
+	auto iter = socket->clientSocketMap->find(port);
+	if( iter !=  socket->clientSocketMap->end() ) {
+		socket->clientSocketMap->erase(iter);
+	}
+	//free((void *)client_socket);
+
+}
+
+/**
+ * server绔悇绉嶇被鍨嬫秷鎭紙锛夊湪杩欓噷杩涚▼鍒嗘嫞
+ */
+void * _server_run_msg_rev(void* _socket) {
+	pthread_detach(pthread_self());
+	shm_socket_t* socket = (shm_socket_t*) _socket;
+	struct timespec timeout = {1, 0};
+	shm_msg_t src;
+	shm_socket_t *client_socket;
+	std::map<int, shm_socket_t* >::iterator iter;
+
+    while(socket->queue->pop(src)) {
+
+    	switch (src.type) {
+			case SHM_SOCKET_OPEN : 
+				socket->acceptQueue->push_timeout(src, &timeout);
+				break;
+			case SHM_SOCKET_CLOSE :
+				_server_close_conn_to_client(socket, src.port);
+				break;
+			case SHM_COMMON_MSG :
+
+				iter = socket->clientSocketMap->find(src.port);
+				if( iter !=  socket->clientSocketMap->end()) {
+					client_socket= iter->second;
+	// print_msg("_server_run_msg_rev push before", src);
+					client_socket->messageQueue->push_timeout(src, &timeout);
+	// print_msg("_server_run_msg_rev push after", src);
+				}
+				
+				break;
+
+			default:
+				err_msg(0, "socket.__shm_rev__: undefined message type.");
+		}
+    }
+ 
+	return NULL;
+}
+
+
+
+void _client_close_conn_to_server(shm_socket_t* socket) {
+	 
+	_shm_close_stream_socket(socket, false);
+}
+
+
+/**
+ * client绔殑鍚勭绫诲瀷娑堟伅锛堬級鍦ㄨ繖閲岃繘绋嬪垎鎷�
+ */
+void * _client_run_msg_rev(void* _socket) {
+	pthread_detach(pthread_self());
+	shm_socket_t* socket = (shm_socket_t*) _socket;
+	struct timespec timeout = {1, 0};
+	shm_msg_t src;
+	
+    while(socket->queue->pop(src)) {
+    	switch (src.type) {
+			 
+			case SHM_SOCKET_CLOSE :
+				_client_close_conn_to_server(socket);
+				break;
+			case SHM_COMMON_MSG :
+				socket->messageQueue->push_timeout(src, &timeout);
+				break;
+			default:
+				err_msg(0, "socket.__shm_rev__: undefined message type.");
+		}
+    }
+ 
+	return NULL;
+}
+
+
+int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote) {
 	socket->status = SHM_CONN_CLOSED;
 	//缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅
 	struct timespec timeout = {1, 0};
@@ -86,295 +465,14 @@
 
 }
 
-int shm_close_socket(shm_socket_t *socket) {
-	return _shm_close_socket(socket, true);
-}
-
-int shm_socket_bind(shm_socket_t * socket, int port) {
-	shm_socket_t * _socket = (shm_socket_t *) socket;
-	_socket -> port = port;
+int _shm_close_dgram_socket(shm_socket_t *socket){
+	if(socket->queue != NULL) {
+		delete socket->queue;
+		socket->queue = NULL;
+	}
+	free(socket);
 	return 0;
 }
-
-int shm_listen(shm_socket_t* socket) {
-	int  port;
-	hashtable_t *hashtable = mm_get_hashtable();
-	if(socket -> port == -1) {
-		port = hashtable_alloc_key(hashtable);
-		socket -> port = port;
-	} else {
-		 
-		if(hashtable_get(hashtable, socket->port)!= NULL) {
-			err_exit(0, "key %d has already been in used!", socket->port);
-		}
-	}
-
-	socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
-	socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
-	socket->clientSocketMap = new std::map<int, shm_socket_t* >;
-	socket->status = SHM_CONN_LISTEN;
-	pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket);
-
-	
-	return 0;
-}
-
-void _server_close_conn_to_client(shm_socket_t* socket, int port) {
-	shm_socket_t *client_socket;
-	auto iter = socket->clientSocketMap->find(port);
-	if( iter !=  socket->clientSocketMap->end() ) {
-		socket->clientSocketMap->erase(iter);
-	}
-	//free((void *)client_socket);
-
-}
-
-/**
- * server绔悇绉嶇被鍨嬫秷鎭紙锛夊湪杩欓噷杩涚▼鍒嗘嫞
- */
-void * _server_run_msg_rev(void* _socket) {
-	pthread_detach(pthread_self());
-	shm_socket_t* socket = (shm_socket_t*) _socket;
-	struct timespec timeout = {1, 0};
-	shm_msg_t src;
-	shm_socket_t *client_socket;
-	std::map<int, shm_socket_t* >::iterator iter;
-
-    while(socket->queue->pop(src)) {
-
-    	switch (src.type) {
-			case SHM_SOCKET_OPEN : 
-				socket->acceptQueue->push_timeout(src, &timeout);
-				break;
-			case SHM_SOCKET_CLOSE :
-				_server_close_conn_to_client(socket, src.port);
-				break;
-			case SHM_COMMON_MSG :
-
-				iter = socket->clientSocketMap->find(src.port);
-				if( iter !=  socket->clientSocketMap->end()) {
-					client_socket= iter->second;
-	// print_msg("_server_run_msg_rev push before", src);
-					client_socket->messageQueue->push_timeout(src, &timeout);
-	// print_msg("_server_run_msg_rev push after", src);
-				}
-				
-				break;
-
-			default:
-				err_msg(0, "socket.__shm_rev__: undefined message type.");
-		}
-    }
- 
-	return NULL;
-}
-
-
-
-
-/**
- * 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹�
- *
-*/
-
-shm_socket_t* shm_accept(shm_socket_t* socket) {
-	hashtable_t *hashtable = mm_get_hashtable();
-	int client_port;
-	shm_socket_t *client_socket;
-	shm_msg_t src;
-
-	if (socket->acceptQueue->pop(src) ) {
-
-// print_msg("===accept:", src);
-		client_port = src.port;
-		client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
-		client_socket->port = socket->port;
-		// client_socket->queue= socket->queue;
-		//鍒濆鍖栨秷鎭痲ueue
-		client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); 
-		//杩炴帴鍒板鏂筿ueue
-		client_socket->remoteQueue = _attach_remote_queue(client_port);
-
-		socket->clientSocketMap->insert({client_port, client_socket});
-
-		/*
-         * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰�
-		*/
-		//鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰
-		struct timespec timeout = {1, 0};
-		shm_msg_t msg;
-		msg.port = socket->port;
-		msg.size = 0;
-		msg.type = SHM_SOCKET_OPEN_REPLY;
-
-		if (client_socket->remoteQueue->push_timeout(msg, &timeout) )
-		{
-			client_socket->status = SHM_CONN_ESTABLISHED;
-			return client_socket;
-		} else {
-			err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触");
-			return NULL;
-		}
-
-		
-	} else {
-		err_exit(errno, "shm_accept");
-	}
-	return NULL;
-	
-}
-
-
-int shm_connect(shm_socket_t* socket, int port) {
-	hashtable_t *hashtable = mm_get_hashtable();
-	if(hashtable_get(hashtable, port)== NULL) {
-		err_exit(0, "shm_connect锛歝onnect at port %d  failed!", port);
-	}
-	if(socket->port == -1) {
-		socket->port = hashtable_alloc_key(hashtable);
-	} else {
-		 
-		if(hashtable_get(hashtable, socket->port)!= NULL) {
-			err_exit(0, "key %d has already been in used!", socket->port);
-		}
-	}
-
-	socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
-	socket->remoteQueue = _attach_remote_queue(port);
-	socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); 
-	
-
-	//鍙戦�乷pen璇锋眰
-	struct timespec timeout = {1, 0};
-	shm_msg_t msg;
-	msg.port = socket->port;
-	msg.size = 0;
-	msg.type=SHM_SOCKET_OPEN;
-	socket->remoteQueue->push_timeout(msg, &timeout);
-
-	//鎺ュ彈open reply
-	if(socket->queue->pop(msg)) {
-		// 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺�
-		if(msg.type == SHM_SOCKET_OPEN_REPLY) {
-			socket->status = SHM_CONN_ESTABLISHED;
-			pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
-		} else {
-			err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!");
-		}
-		
-	} else {
-		err_exit(0, "connect failted!");
-	}
-	
-	return 0;
-}
-
-void _client_close_conn_to_server(shm_socket_t* socket) {
-	 
-	_shm_close_socket(socket, false);
-}
-
-
-/**
- * client绔殑鍚勭绫诲瀷娑堟伅锛堬級鍦ㄨ繖閲岃繘绋嬪垎鎷�
- */
-void * _client_run_msg_rev(void* _socket) {
-	pthread_detach(pthread_self());
-	shm_socket_t* socket = (shm_socket_t*) _socket;
-	struct timespec timeout = {1, 0};
-	shm_msg_t src;
-	
-    while(socket->queue->pop(src)) {
-    	switch (src.type) {
-			 
-			case SHM_SOCKET_CLOSE :
-				_client_close_conn_to_server(socket);
-				break;
-			case SHM_COMMON_MSG :
-				socket->messageQueue->push_timeout(src, &timeout);
-				break;
-			default:
-				err_msg(0, "socket.__shm_rev__: undefined message type.");
-		}
-    }
- 
-	return NULL;
-}
-
-
-
- 
-
-int shm_send(shm_socket_t *socket, const void *buf, const int size) {
-	// hashtable_t *hashtable = mm_get_hashtable();
-	// if(socket->remoteQueue == NULL) {
-	// 	err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!");
-	// 	return -1;
-	// }
-	shm_msg_t dest;
-	dest.type=SHM_COMMON_MSG;
-	dest.port = socket->port;
-	dest.size = size;
-	dest.buf = mm_malloc(size);
-	memcpy(dest.buf, buf, size);
-
-	// struct timeval time;
-	// gettimeofday(&time, NULL);
-//err_msg(0, "%d %d=======>push befor %d", time.tv_sec, time.tv_usec, socket->port);
-	if(socket->remoteQueue->push(dest)) {
-
-		//gettimeofday(&time, NULL);
-//err_msg(0, "%d %d=======>push after %d", time.tv_sec, time.tv_usec, socket->port);
-		return 0;
-	} else {
-		err_msg(errno, "connection has been closed!");
-		return -1;
-	}
-	
-	
-}
-
-int shm_recv(shm_socket_t* socket, void **buf, int *size) {
-	shm_msg_t src;
-
-// 	struct timeval time;
-// 	gettimeofday(&time, NULL);
-// err_msg(0, "%d %d=======>pop befor %d", time.tv_sec, time.tv_usec, socket->port);
-	if (socket->messageQueue->pop(src)) {
-// gettimeofday(&time, NULL);
-// err_msg(0, "%d %d=======>pop after %d", time.tv_sec, time.tv_usec, socket->port);
-		void * _buf = malloc(src.size);
-		memcpy(_buf, src.buf, src.size);
-		*buf = _buf;
-		*size = src.size;
-		mm_free(src.buf);
-		return 0;
-	} else {
-		return -1;
-	}
-	
-	
-	
-}
-
-
-/**
- * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑�
- */
-SHMQueue<shm_msg_t> * _attach_remote_queue(int port) {
-	hashtable_t *hashtable = mm_get_hashtable();
-	if(hashtable_get(hashtable, port)== NULL) {
-		err_exit(0, "_remote_queue_attach锛歝onnet at port %d  failed!", port);
-		return NULL;
-	}
-	 
-	SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0);
-	return queue;
-}
-
-
-
-
 
  
 
diff --git a/src/util/sem_util.h b/src/util/include/sem_util.h
similarity index 100%
rename from src/util/sem_util.h
rename to src/util/include/sem_util.h
diff --git a/src/util/sem_util.c b/src/util/sem_util.c
index ceedac3..e2b2c20 100644
--- a/src/util/sem_util.c
+++ b/src/util/sem_util.c
@@ -1,158 +1,148 @@
 #include "sem_util.h"
+#include "logger_factory.h"
 
+static Logger logger = LoggerFactory::getLogger();
 
 int SemUtil::get(key_t key, unsigned int value) {
-    int semid, perms;
-     
-    perms = S_IRUSR | S_IWUSR;
+  int semid, perms;
 
-    semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms);
+  perms = S_IRUSR | S_IWUSR;
 
-    if (semid != -1) {                  /* Successfully created the semaphore */
-        union semun arg;
-        struct sembuf sop;
+  semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms);
 
-        fprintf(stderr, "%ld: created semaphore\n", (long) getpid());
+  if (semid != -1) { /* Successfully created the semaphore */
+    union semun arg;
+    struct sembuf sop;
 
-        arg.val = 0;                    /* So initialize it to 0 */
-        if (semctl(semid, 0, SETVAL, arg) == -1)
-            err_exit(errno, "semctl 1");
-        fprintf(stderr, "%ld: initialized semaphore\n", (long) getpid());
+    logger.info("%ld: created semaphore\n", (long)getpid());
 
-        /* Perform a "no-op" semaphore operation - changes sem_otime
-           so other processes can see we've initialized the set. */
+    arg.val = 0; /* So initialize it to 0 */
+    if (semctl(semid, 0, SETVAL, arg) == -1)
+      err_exit(errno, "semctl 1");
+    logger.info("%ld: initialized semaphore\n", (long)getpid());
 
-        sop.sem_num = 0;                /* Operate on semaphore 0 */
-        sop.sem_op = value;                
-        sop.sem_flg = 0;
-        if (semop(semid, &sop, 1) == -1)
-            err_exit(errno, "semop");
-        fprintf(stderr, "%ld: completed dummy semop()\n", (long) getpid());
+    /* Perform a "no-op" semaphore operation - changes sem_otime
+       so other processes can see we've initialized the set. */
 
-    } else {                            /* We didn't create the semaphore set */
+    sop.sem_num = 0; /* Operate on semaphore 0 */
+    sop.sem_op = value;
+    sop.sem_flg = 0;
+    if (semop(semid, &sop, 1) == -1)
+      err_exit(errno, "semop");
+    logger.info("%ld: completed dummy semop()\n", (long)getpid());
 
-        if (errno != EEXIST) {          /* Unexpected error from semget() */
-            err_exit(errno, "semget 1");
+  } else { /* We didn't create the semaphore set */
 
-        } else {                        /* Someone else already created it */
-            const int MAX_TRIES = 10;
-            int j;
-            union semun arg;
-            struct semid_ds ds;
+    if (errno != EEXIST) { /* Unexpected error from semget() */
+      err_exit(errno, "semget 1");
 
-            semid = semget(key, 1, perms);      /* So just get ID */
-            if (semid == -1)
-                err_exit(errno, "semget 2");
+    } else { /* Someone else already created it */
+      const int MAX_TRIES = 10;
+      int j;
+      union semun arg;
+      struct semid_ds ds;
 
-            fprintf(stderr, "%ld: got semaphore key\n", (long) getpid());
-            /* Wait until another process has called semop() */
+      semid = semget(key, 1, perms); /* So just get ID */
+      if (semid == -1)
+        err_exit(errno, "semget 2");
 
-            arg.buf = &ds;
-            for (j = 0; j < MAX_TRIES; j++) {
-                fprintf(stderr, "Try %d\n", j);
-                if (semctl(semid, 0, IPC_STAT, arg) == -1)
-                    err_exit(errno, "semctl 2");
+      logger.info("%ld: got semaphore key\n", (long)getpid());
+      /* Wait until another process has called semop() */
 
-                if (ds.sem_otime != 0)          /* Semop() performed? */
-                    break;                      /* Yes, quit loop */
-                sleep(1);                       /* If not, wait and retry */
-            }
+      arg.buf = &ds;
+      for (j = 0; j < MAX_TRIES; j++) {
+        logger.info("Try %d\n", j);
+        if (semctl(semid, 0, IPC_STAT, arg) == -1)
+          err_exit(errno, "semctl 2");
 
-            if (ds.sem_otime == 0)              /* Loop ran to completion! */
-                err_exit(errno, "Existing semaphore not initialized");
-        }
+        if (ds.sem_otime != 0) /* Semop() performed? */
+          break;               /* Yes, quit loop */
+        sleep(1);              /* If not, wait and retry */
+      }
+
+      if (ds.sem_otime == 0) /* Loop ran to completion! */
+        err_exit(errno, "Existing semaphore not initialized");
     }
-    return semid;
+  }
+  return semid;
 }
-
 
 /* Reserve semaphore (blocking), return 0 on success, or -1 with 'errno'
    set to EINTR if operation was interrupted by a signal handler */
 
 /* Reserve semaphore - decrement it by 1 */
-int SemUtil::dec(int semId)
-{
-    struct sembuf sops;
+int SemUtil::dec(int semId) {
+  struct sembuf sops;
 
-    sops.sem_num = 0;
-    sops.sem_op = -1;
-    sops.sem_flg =  0;
+  sops.sem_num = 0;
+  sops.sem_op = -1;
+  sops.sem_flg = 0;
 
-    while (semop(semId, &sops, 1) == -1)
-        if (errno != EINTR ) {
-            err_msg(errno, "SemUtil::dec");
-            return -1;
-        }
+  while (semop(semId, &sops, 1) == -1)
+    if (errno != EINTR) {
+      err_msg(errno, "SemUtil::dec");
+      return -1;
+    }
 
-    return 0;
+  return 0;
 }
 
-int SemUtil::dec_nowait(int semId)
-{
-    struct sembuf sops;
+int SemUtil::dec_nowait(int semId) {
+  struct sembuf sops;
 
-    sops.sem_num = 0;
-    sops.sem_op = -1;
-    sops.sem_flg =  IPC_NOWAIT;
+  sops.sem_num = 0;
+  sops.sem_op = -1;
+  sops.sem_flg = IPC_NOWAIT;
 
-    while (semop(semId, &sops, 1) == -1)
-        if (errno != EINTR ) {
-            err_msg(errno, "SemUtil::dec_nowait");
-            return -1;
-        }
+  while (semop(semId, &sops, 1) == -1)
+    if (errno != EINTR) {
+      err_msg(errno, "SemUtil::dec_nowait");
+      return -1;
+    }
 
-    return 0;
+  return 0;
 }
 
-int SemUtil::dec_timeout(int semId, struct timespec * timeout)
-{
-    struct sembuf sops;
+int SemUtil::dec_timeout(int semId, struct timespec *timeout) {
+  struct sembuf sops;
 
-    sops.sem_num = 0;
-    sops.sem_op = -1;
-    sops.sem_flg = 0;
+  sops.sem_num = 0;
+  sops.sem_op = -1;
+  sops.sem_flg = 0;
 
-    while ( semtimedop(semId, &sops, 1, timeout) == -1)
-        if (errno != EINTR ) {
-            err_msg(errno, "SemUtil::dec_timeout");
-            return -1;
-        }
+  while (semtimedop(semId, &sops, 1, timeout) == -1)
+    if (errno != EINTR) {
+      err_msg(errno, "SemUtil::dec_timeout");
+      return -1;
+    }
 
-    return 0;
+  return 0;
 }
-
-
 
 /* Release semaphore - increment it by 1 */
-int SemUtil::inc(int semId)
-{
-    struct sembuf sops;
+int SemUtil::inc(int semId) {
+  struct sembuf sops;
 
-    sops.sem_num = 0;
-    sops.sem_op = 1;
-    sops.sem_flg = 0;
+  sops.sem_num = 0;
+  sops.sem_op = 1;
+  sops.sem_flg = 0;
 
-    int rv = semop(semId, &sops, 1);
-    if(rv == -1) {
-        err_msg(errno, "SemUtil::inc");
-    }
-    return rv;
+  int rv = semop(semId, &sops, 1);
+  if (rv == -1) {
+    err_msg(errno, "SemUtil::inc");
+  }
+  return rv;
 }
 
 void SemUtil::remove(int semid) {
-    union semun dummy;
-    if (semctl(semid, 0, IPC_RMID, dummy) == -1)
-        err_msg(errno, "SemUtil::remove");
-
+  union semun dummy;
+  if (semctl(semid, 0, IPC_RMID, dummy) == -1)
+    err_msg(errno, "SemUtil::remove");
 }
 
-
-void SemUtil::set(int semId, int val)
-{
-    union semun arg;
-    arg.val = val;
-    if (semctl(semId, 0, SETVAL, arg) == -1)
-        err_msg(errno, "SemUtil::set");
+void SemUtil::set(int semId, int val) {
+  union semun arg;
+  arg.val = val;
+  if (semctl(semId, 0, SETVAL, arg) == -1)
+    err_msg(errno, "SemUtil::set");
 }
-
-
diff --git a/test/Makefile b/test/Makefile
index eee9f77..643b72d 100755
--- a/test/Makefile
+++ b/test/Makefile
@@ -2,38 +2,25 @@
 # Makefile for common library.
 #
 ROOT=..
-LDLIBS+=-Wl,-rpath=$(ROOT)/queue:$(ROOT)/lib
+LDLIBS+=-Wl,-rpath=$(ROOT)/lib:$(ROOT)/build/lib
 # 寮�婧愬伐鍏峰寘璺緞
-LDDIR += -L$(ROOT)/queue
+LDDIR +=  -L$(ROOT)/lib -L$(ROOT)/build/lib
 # 寮�婧愬伐鍏峰寘
 LDLIBS += -lshm_queue -lusgcommon -lpthread
 
-INCLUDE += -I$(ROOT)/queue/ -I$(ROOT)/queue/include
+INCLUDE += -I$(ROOT)/build/include
 
 PLATFORM=$(shell $(ROOT)/systype.sh)
 include $(ROOT)/Make.defines.$(PLATFORM)
 
  
-PROGS = communication
-
+PROGS = dgram_socket_test
 
 build: $(PROGS)
 
 # test1: $(LIBCOMMON)
 
 # 濡傛灉鍖匒 寮曠敤鍖匓锛� B 瑕佹斁鍦� A 鍚庨潰
- 
-
-test_queue: test.h  $(ROOT)/queue/include/lock_free_queue.h
-
-single_productor: test.h  $(ROOT)/queue/include/lock_free_queue.h
-
-single_consumer: test.h  $(ROOT)/queue/include/lock_free_queue.h
 
 clean:
 	rm -f $(TEMPFILES) $(PROGS)
-
-
-
-$(LIBQUEUE):
-	(cd $(ROOT)/queue && $(MAKE))
diff --git a/test/dgram_socket_test b/test/dgram_socket_test
new file mode 100755
index 0000000..6770bc1
--- /dev/null
+++ b/test/dgram_socket_test
Binary files differ
diff --git a/test/dgram_socket_test.c b/test/dgram_socket_test.c
new file mode 100644
index 0000000..97c602a
--- /dev/null
+++ b/test/dgram_socket_test.c
@@ -0,0 +1,74 @@
+#include "shm_socket.h"
+#include "usg_common.h"
+#include "shm_mm.h"
+
+#include "shm_socket.h"
+#include "usg_common.h"
+#include "shm_mm.h"
+typedef struct Targ {
+	int port;
+	int id;
+}Targ;
+
+ 
+void server(int port) {
+	pthread_t tid;
+	shm_socket_t *socket = shm_open_socket(SHM_SOCKET_DGRAM);
+	shm_socket_bind(socket, port);
+ 
+	char *buf;
+	int size;
+	int remotePort;
+	char sendbuf[512];
+	while( shm_recvfrom(socket, (void **)&buf, &size, &remotePort) == 0) {
+		sprintf(sendbuf, "RECEIVED:%s", buf);
+		printf("received from %d:%s\n", remotePort, buf);
+		shm_sendto(socket, (void *)sendbuf, strlen(sendbuf) + 1, remotePort);
+		free(buf);
+	}
+
+	shm_close_socket(socket);
+
+}
+
+void client(int port) {
+	shm_socket_t *socket = shm_open_socket(SHM_SOCKET_DGRAM);
+	int size;
+	char *recvbuf;
+	char sendbuf[512];
+	int remote_port;
+	while(true) {
+		printf("request: ");
+		scanf("%s", sendbuf);
+		shm_sendto(socket, sendbuf, strlen(sendbuf)+1, port) ;
+		shm_recvfrom(socket, (void **)&recvbuf, &size, &remote_port);
+		printf("reply from (%d): %s\n",  remote_port, recvbuf);
+		free(recvbuf);
+
+	}
+	shm_close_socket(socket);
+}
+ 
+int main(int argc, char *argv[]) {
+  shm_init(512);
+  int port;
+  if (argc < 3) {
+  	 fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
+  	 return 1;
+  }
+
+  port = atoi(argv[2]);
+
+  if (strcmp("server", argv[1]) == 0 ) {
+     server(port);
+  }
+
+  if (strcmp("client", argv[1]) == 0)
+     client(port);
+
+
+
+  shm_destroy();
+ // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
+  return 0;
+}
\ No newline at end of file

--
Gitblit v1.8.0