wangzhengquan
2020-07-21 91ec036cace39fd5b5f04644f6bced1f477005e0
update
1 文件已复制
2个文件已删除
20个文件已添加
2 文件已重命名
15个文件已修改
3014 ■■■■ 已修改文件
Make.defines.linux 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
Makefile 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/array_lock_free_queue.h 322 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/array_lock_free_queue2.h 332 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/hashtable.h 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/linked_lock_free_queue.h 245 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/lock_free_queue.h 360 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/logger_factory.h 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/mem_pool.h 55 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/mm.h 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/mod_socket.h 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/sem_util.h 补丁 | 查看 | 原始文档 | blame | 历史
build/include/shm_allocator.h 102 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/shm_mm.h 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/shm_queue.h 184 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/shm_queue_wrapper.h 100 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/include/shm_socket.h 87 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build/lib/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
demo/Makefile 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
demo/pub_sub 补丁 | 查看 | 原始文档 | blame | 历史
demo/queue 补丁 | 查看 | 原始文档 | blame | 历史
demo/queue.c 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
demo/req_rep 补丁 | 查看 | 原始文档 | blame | 历史
src/Makefile 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
src/logger_factory.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/include/shm_allocator.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/include/shm_queue.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/include/shm_queue_wrapper.h 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/libshm_queue.so 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/shm_queue_wrapper.c 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/include/shm_socket.h 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/mod_socket.c 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.c 678 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/util/include/sem_util.h 补丁 | 查看 | 原始文档 | blame | 历史
src/util/sem_util.c 208 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/Makefile 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/dgram_socket_test 补丁 | 查看 | 原始文档 | blame | 历史
test/dgram_socket_test.c 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Make.defines.linux
@@ -25,7 +25,7 @@
# Common temp files to delete from each directory.
TEMPFILES=core core.* *.o temp.* *.out *.a *.so
TEMPFILES=core core.*  **/*.o temp.* *.out *.a *.so
%:    %.c 
    $(CC) $(CFLAGS) $^ -o $@ $(LDFLAGS) $(LDLIBS)
Makefile
@@ -1,4 +1,4 @@
DIRS = src test test2 demo
DIRS = src test demo
all:
    for i in $(DIRS); do \
@@ -9,6 +9,7 @@
    for i in $(DIRS); do \
        (cd $$i && echo "cleaning $$i" && $(MAKE) clean) || exit 1; \
    done
    rm -rf build
ipcrm:
    -ipcrm -a
build/include/array_lock_free_queue.h
New file
@@ -0,0 +1,322 @@
#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#include "atomic_ops.h"
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
#include "logger_factory.h"
#include "mem_pool.h"
#include "shm_allocator.h"
/// @brief implementation of an array based lock free queue with support for
///        multiple producers
/// This class is prevented from being instantiated directly (all members and
/// methods are private). To instantiate a multiple producers lock free queue
/// you must use the ArrayLockFreeQueue fachade:
///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
template <typename ELEM_T, typename Allocator = SHM_Allocator>
class ArrayLockFreeQueue
{
    // ArrayLockFreeQueue will be using this' private members
    template <
        typename ELEM_T_,
        typename Allocator_,
        template <typename T, typename AT> class Q_TYPE
        >
    friend class LockFreeQueue;
private:
    /// @brief constructor of the class
    ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    virtual ~ArrayLockFreeQueue();
    inline uint32_t size();
    inline bool full();
    inline bool empty();
    bool push(const ELEM_T &a_data);
    bool pop(ELEM_T &a_data);
    /// @brief calculate the index in the circular array that corresponds
    /// to a particular "count" value
    inline uint32_t countToIndex(uint32_t a_count);
    ELEM_T& operator[](unsigned i);
private:
    size_t Q_SIZE;
    /// @brief array to keep the elements
    ELEM_T *m_theQueue;
    /// @brief where a new element will be inserted
    uint32_t m_writeIndex;
    /// @brief where the next element where be extracted from
    uint32_t m_readIndex;
    /// @brief maximum read index for multiple producer queues
    /// If it's not the same as m_writeIndex it means
    /// there are writes pending to be "committed" to the queue, that means,
    /// the place for the data was reserved (the index in the array) but
    /// data is still not in the queue, so the thread trying to read will have
    /// to wait for those other threads to save the data into the queue
    ///
    /// note this is only used for multiple producers
    uint32_t m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    /// @brief number of elements in the queue
    uint32_t m_count;
#endif
private:
    /// @brief disable copy constructor declaring it private
    ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
};
template <typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
    Q_SIZE(qsize),
    m_writeIndex(0),      // initialisation is not atomic
    m_readIndex(0),       //
    m_maximumReadIndex(0) //
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    ,m_count(0)           //
#endif
{
    m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
}
template <typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
{
    // std::cout << "destroy ArrayLockFreeQueue\n";
    Allocator::deallocate(m_theQueue);
}
template <typename ELEM_T, typename Allocator>
inline
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
{
    // if Q_SIZE is a power of 2 this statement could be also written as
    // return (a_count & (Q_SIZE - 1));
    return (a_count % Q_SIZE);
}
template <typename ELEM_T, typename Allocator>
inline
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return m_count;
#else
    uint32_t currentWriteIndex = m_maximumReadIndex;
    uint32_t currentReadIndex  = m_readIndex;
    // let's think of a scenario where this function returns bogus data
    // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
    // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
    // 2. afterwards this thread is preemted. While this thread is inactive 2
    // elements are inserted and removed from the queue, so m_maximumReadIndex
    // is 5 and m_readIndex 4. Real size is still 1
    // 3. Now the current thread comes back from preemption and reads m_readIndex.
    // currentReadIndex is 4
    // 4. currentReadIndex is bigger than currentWriteIndex, so
    // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
    // it returns that the queue is almost full, when it is almost empty
    //
    if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
    {
        return (currentWriteIndex - currentReadIndex);
    }
    else
    {
        return (Q_SIZE + currentWriteIndex - currentReadIndex);
    }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T, typename Allocator>
inline
bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count == (Q_SIZE));
#else
    uint32_t currentWriteIndex = m_writeIndex;
    uint32_t currentReadIndex  = m_readIndex;
    if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
    {
        // the queue is full
        return true;
    }
    else
    {
        // not full!
        return false;
    }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T, typename Allocator>
inline
bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count == 0);
#else
    if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
    {
        // the queue is full
        return true;
    }
    else
    {
        // not full!
        return false;
    }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
{
    uint32_t currentReadIndex;
    uint32_t currentWriteIndex;
    do
    {
        currentWriteIndex = m_writeIndex;
        currentReadIndex  = m_readIndex;
    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count == Q_SIZE) {
            return false;
        }
    #else
        if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
        {
            // the queue is full
            return false;
        }
    #endif
    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
    // We know now that this index is reserved for us. Use it to save the data
    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
    // inserting in the queue. It might fail if there are more than 1 producer threads because this
    // operation has to be done in the same order as the previous CAS
    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
    {
        // this is a good place to yield the thread in case there are more
        // software threads than hardware processors and you have more
        // than 1 producer thread
        // have a look at sched_yield (POSIX.1b)
        sched_yield();
    }
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    AtomicAdd(&m_count, 1);
#endif
    return true;
}
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
{
    uint32_t currentMaximumReadIndex;
    uint32_t currentReadIndex;
    do
    {
        // to ensure thread-safety when there is more than 1 producer thread
        // a second index is defined (m_maximumReadIndex)
        currentReadIndex        = m_readIndex;
        currentMaximumReadIndex = m_maximumReadIndex;
    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count == 0) {
            return false;
        }
    #else
        if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
        {
            // the queue is empty or
            // a producer thread has allocate space in the queue but is
            // waiting to commit the data into it
            return false;
        }
    #endif
        // retrieve the data from the queue
        a_data = m_theQueue[countToIndex(currentReadIndex)];
        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we
        // increased it
        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
        {
        #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
            // m_count.fetch_sub(1);
            AtomicSub(&m_count, 1);
        #endif
            return true;
        }
        // it failed retrieving the element off the queue. Someone else must
        // have read the element stored at countToIndex(currentReadIndex)
        // before we could perform the CAS operation
    } while(1); // keep looping to try again!
    // Something went wrong. it shouldn't be possible to reach here
    assert(0);
    // Add this return statement to avoid compiler warnings
    return false;
}
template <typename ELEM_T, typename Allocator>
ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
    int currentCount = m_count;
    uint32_t currentReadIndex = m_readIndex;
    if (i < 0 || i >= currentCount)
    {
        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
        std::exit(EXIT_FAILURE);
    }
    return m_theQueue[countToIndex(currentReadIndex+i)];
}
#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
build/include/array_lock_free_queue2.h
New file
@@ -0,0 +1,332 @@
#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
#include "logger_factory.h"
/// @brief implementation of an array based lock free queue with support for
///        multiple producers
/// This class is prevented from being instantiated directly (all members and
/// methods are private). To instantiate a multiple producers lock free queue
/// you must use the ArrayLockFreeQueue fachade:
///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
template <typename ELEM_T>
class ArrayLockFreeQueue
{
    // ArrayLockFreeQueue will be using this' private members
    template <
        typename ELEM_T_,
        template <typename T> class Q_TYPE >
    friend class LockFreeQueue;
private:
    /// @brief constructor of the class
    ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    virtual ~ArrayLockFreeQueue();
    inline uint32_t size();
    inline bool full();
    inline bool empty();
    bool push(const ELEM_T &a_data);
    bool pop(ELEM_T &a_data);
    /// @brief calculate the index in the circular array that corresponds
    /// to a particular "count" value
    inline uint32_t countToIndex(uint32_t a_count);
    ELEM_T& operator[](unsigned i);
private:
    size_t Q_SIZE;
    /// @brief array to keep the elements
    ELEM_T *m_theQueue;
    /// @brief where a new element will be inserted
    std::atomic<uint32_t> m_writeIndex;
    /// @brief where the next element where be extracted from
    std::atomic<uint32_t> m_readIndex;
    /// @brief maximum read index for multiple producer queues
    /// If it's not the same as m_writeIndex it means
    /// there are writes pending to be "committed" to the queue, that means,
    /// the place for the data was reserved (the index in the array) but
    /// data is still not in the queue, so the thread trying to read will have
    /// to wait for those other threads to save the data into the queue
    ///
    /// note this is only used for multiple producers
    std::atomic<uint32_t> m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    /// @brief number of elements in the queue
    std::atomic<uint32_t> m_count;
#endif
private:
    /// @brief disable copy constructor declaring it private
    ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src);
};
template <typename ELEM_T>
ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize):
    Q_SIZE(qsize),
    m_writeIndex(0),      // initialisation is not atomic
    m_readIndex(0),       //
    m_maximumReadIndex(0) //
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    ,m_count(0)           //
#endif
{
    m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T));
}
template <typename ELEM_T>
ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue()
{
    // std::cout << "destroy ArrayLockFreeQueue\n";
    mm_free(m_theQueue);
}
template <typename ELEM_T>
inline
uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count)
{
    // if Q_SIZE is a power of 2 this statement could be also written as
    // return (a_count & (Q_SIZE - 1));
    return (a_count % Q_SIZE);
}
template <typename ELEM_T>
inline
uint32_t ArrayLockFreeQueue<ELEM_T>::size()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return m_count.load();
#else
    uint32_t currentWriteIndex = m_maximumReadIndex.load();
    uint32_t currentReadIndex  = m_readIndex.load();
    // let's think of a scenario where this function returns bogus data
    // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
    // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
    // 2. afterwards this thread is preemted. While this thread is inactive 2
    // elements are inserted and removed from the queue, so m_maximumReadIndex
    // is 5 and m_readIndex 4. Real size is still 1
    // 3. Now the current thread comes back from preemption and reads m_readIndex.
    // currentReadIndex is 4
    // 4. currentReadIndex is bigger than currentWriteIndex, so
    // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
    // it returns that the queue is almost full, when it is almost empty
    //
    if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
    {
        return (currentWriteIndex - currentReadIndex);
    }
    else
    {
        return (Q_SIZE + currentWriteIndex - currentReadIndex);
    }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T>
inline
bool ArrayLockFreeQueue<ELEM_T>::full()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count.load() == (Q_SIZE));
#else
    uint32_t currentWriteIndex = m_writeIndex;
    uint32_t currentReadIndex  = m_readIndex;
    if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
    {
        // the queue is full
        return true;
    }
    else
    {
        // not full!
        return false;
    }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T>
inline
bool ArrayLockFreeQueue<ELEM_T>::empty()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count.load() == 0);
#else
    if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load()))
    {
        // the queue is full
        return true;
    }
    else
    {
        // not full!
        return false;
    }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T>
bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
{
    uint32_t currentReadIndex;
    uint32_t currentWriteIndex;
    do
    {
        currentWriteIndex = m_writeIndex.load();
        currentReadIndex = m_readIndex.load();
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count.load() == Q_SIZE) {
            return false;
        }
#else
        if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
        {
            // the queue is full
            return false;
        }
#endif
    // There is more than one producer. Keep looping till this thread is able
    // to allocate space for current piece of data
    //
    // using compare_exchange_strong because it isn't allowed to fail spuriously
    // When the compare_exchange operation is in a loop the weak version
    // will yield better performance on some platforms, but here we'd have to
    // load m_writeIndex all over again
    } while (!m_writeIndex.compare_exchange_strong(
                currentWriteIndex, (currentWriteIndex + 1)));
    // Just made sure this index is reserved for this thread.
    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
    //memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) );
    // update the maximum read index after saving the piece of data. It can't
    // fail if there is only one thread inserting in the queue. It might fail
    // if there is more than 1 producer thread because this operation has to
    // be done in the same order as the previous CAS
    //
    // using compare_exchange_weak because they are allowed to fail spuriously
    // (act as if *this != expected, even if they are equal), but when the
    // compare_exchange operation is in a loop the weak version will yield
    // better performance on some platforms.
    while (!m_maximumReadIndex.compare_exchange_weak(
                currentWriteIndex, (currentWriteIndex + 1)))
    {
        // this is a good place to yield the thread in case there are more
        // software threads than hardware processors and you have more
        // than 1 producer thread
        // have a look at sched_yield (POSIX.1b)
        sched_yield();
    }
    // The value was successfully inserted into the queue
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    m_count.fetch_add(1);
#endif
    return true;
}
template <typename ELEM_T>
bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
{
    uint32_t currentMaximumReadIndex;
    uint32_t currentReadIndex;
    do
    {
        currentReadIndex = m_readIndex.load();
        currentMaximumReadIndex = m_maximumReadIndex.load();
     #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count.load() == 0) {
            return false;
        }
    #else
        // to ensure thread-safety when there is more than 1 producer
        // thread a second index is defined (m_maximumReadIndex)
        if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
        {
            // the queue is empty or
            // a producer thread has allocate space in the queue but is
            // waiting to commit the data into it
            return false;
        }
    #endif
        // retrieve the data from the queue
        a_data = m_theQueue[countToIndex(currentReadIndex)];
        //memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) );
        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we
        // increased it
        if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1)))
        {
            // got here. The value was retrieved from the queue. Note that the
            // data inside the m_queue array is not deleted nor reseted
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
            m_count.fetch_sub(1);
#endif
            return true;
        }
        // it failed retrieving the element off the queue. Someone else must
        // have read the element stored at countToIndex(currentReadIndex)
        // before we could perform the CAS operation
    } while(1); // keep looping to try again!
    // Something went wrong. it shouldn't be possible to reach here
    assert(0);
    // Add this return statement to avoid compiler warnings
    return false;
}
template <typename ELEM_T>
ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i)
{
    int currentCount = m_count.load();
    uint32_t currentReadIndex = m_readIndex.load();
    if (i < 0 || i >= currentCount)
    {
        std::cerr << "Error in array limits: " << i << " is out of range\n";
        std::exit(EXIT_FAILURE);
    }
    return m_theQueue[countToIndex(currentReadIndex+i)];
}
#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
build/include/hashtable.h
New file
@@ -0,0 +1,34 @@
#ifndef __HASHTABLE_H__
#define __HASHTABLE_H__
#include <sys/queue.h>
#include <set>
#define MAPSIZE 100
typedef struct hashtable_t
{
 struct tailq_header_t* array[MAPSIZE];
 int mutex;
 int wlock;
 int cond;
 size_t readcnt;
} hashtable_t;
typedef void (*hashtable_foreach_cb)(int key, void *value);
void hashtable_init(hashtable_t *hashtable);
void *hashtable_get(hashtable_t *hashtable, int key);
void hashtable_put(hashtable_t *hashtable, int key, void *value);
void *hashtable_remove(hashtable_t *hashtable, int key);
void hashtable_removeall(hashtable_t *hashtable);
void hashtable_foreach(hashtable_t *hashtable, hashtable_foreach_cb cb);
void hashtable_printall(hashtable_t *hashtable);
int hashtable_alloc_key(hashtable_t *hashtable);
std::set<int> * hashtable_keyset(hashtable_t *hashtable) ;
#endif
build/include/linked_lock_free_queue.h
New file
@@ -0,0 +1,245 @@
// queue.h -- interface for a queue
#ifndef __LINKED_LOCK_FREE_QUEUE_H_
#define __LINKED_LOCK_FREE_QUEUE_H_
#include "mm.h"
#include "sem_util.h"
template <typename T> class Node;
template <typename T>
class Pointer {
public:
    Node<T> *ptr;
    unsigned long count;
    Pointer( Node<T> *node = NULL, int c=0) noexcept :  ptr(node), count(c) {}
    bool operator == (const Pointer<T> o) const {
      return (o.ptr == ptr) && (o.count == count);
    }
    bool operator != (const Pointer<T> o) const {
      return !((o.ptr == ptr) && (o.count == count));
    }
};
template <typename T>
class Node {
public:
    alignas(16) std::atomic<Pointer<T> > next;
    T value;
    Node() {
    }
    void *operator new(size_t size){
        return mm_malloc(size);
    }
    void operator delete(void *p) {
        return mm_free(p);
    }
};
template <typename ELEM_T>
class LinkedLockFreeQueue
{
    template <
        typename ELEM_T_,
        template <typename T> class Q_TYPE >
    friend class LockFreeQueue;
private:
// class scope definitions
    enum {Q_SIZE = 10};
// private class members
    std::atomic<Pointer<ELEM_T> > Head;       // pointer to front of Queue
    std::atomic<Pointer<ELEM_T> > Tail;        // pointer to rear of Queue
    //std::atomic_uint count;          // current number of size in Queue
    std::atomic_uint count;
    const size_t qsize;    // maximum number of size in Queue
    // preemptive definitions to prevent public copying
    LinkedLockFreeQueue(const LinkedLockFreeQueue & q) : qsize(0) { }
    LinkedLockFreeQueue & operator=(const LinkedLockFreeQueue & q) { return *this;}
protected:
    LinkedLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit
    ~LinkedLockFreeQueue();
    bool empty() const;
    bool full() const;
    unsigned int size() const;
    bool push(const ELEM_T &item); // add item to end
    bool pop(ELEM_T &item);
    ELEM_T& operator[](unsigned i);
};
// Queue methods
template <typename T>
LinkedLockFreeQueue<T>::LinkedLockFreeQueue(size_t qs) : count(0), qsize(qs)
{
    Node<T> *node = new Node<T>;
    Pointer<T> pointer(node, 0);
    Head.store(pointer, std::memory_order_relaxed);
    Tail.store(pointer, std::memory_order_relaxed);
}
template <typename T>
LinkedLockFreeQueue<T>::~LinkedLockFreeQueue()
{
    LoggerFactory::getLogger().debug("LinkedLockFreeQueue destory");
    Node<T> * nodeptr;
    Pointer<T> tmp = Head.load(std::memory_order_relaxed);
    while((nodeptr = tmp.ptr) != NULL) {
        tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
        //std::cerr << "delete " << nodeptr << std::endl;
        delete nodeptr;
    }
}
template <typename T>
bool LinkedLockFreeQueue<T>::empty() const
{
    return count == 0;
}
template <typename T>
bool LinkedLockFreeQueue<T>::full() const
{
    return count == qsize;
}
template <typename T>
unsigned int LinkedLockFreeQueue<T>::size() const
{
    return count;
}
// Add item to queue
template <typename T>
bool LinkedLockFreeQueue<T>::push(const T & item)
{
    if (full())
        return false;
    Node<T> * node = new Node<T>;
    node->value = item;
    Pointer<T> tail ;
    Pointer<T> next ;
    while(true) {
        tail = Tail.load(std::memory_order_relaxed);
        next = (tail.ptr->next).load(std::memory_order_relaxed);
        if (tail == Tail.load(std::memory_order_relaxed)) {
            if (next.ptr == NULL) {
                if ((tail.ptr->next).compare_exchange_weak(next,
                    Pointer<T>(node, next.count+1),
                    std::memory_order_release,
                    std::memory_order_relaxed) )
                    break;
                else
                    Tail.compare_exchange_weak(tail,
                        Pointer<T>(next.ptr, tail.count+1),
                        std::memory_order_release,
                        std::memory_order_relaxed);
            }
        }
    }
    Tail.compare_exchange_weak(tail, Pointer<T>(node, tail.count+1),
        std::memory_order_release,
        std::memory_order_relaxed);
    count++;
    return true;
}
// Place front item into item variable and remove from queue
template <typename T>
bool LinkedLockFreeQueue<T>::pop(T & item)
{
    if (empty())
        return false;
    Pointer<T> head;
    Pointer<T> tail;
    Pointer<T> next;
    while(true) {
        head = Head.load(std::memory_order_relaxed);
        tail = Tail.load(std::memory_order_relaxed);
        next = (head.ptr->next).load();
        if (head == Head.load(std::memory_order_relaxed)) {
            if(head.ptr == tail.ptr) {
                if (next.ptr == NULL)
                    return false;
                // Tail is falling behind. Try to advance it
                Tail.compare_exchange_weak(tail,
                        Pointer<T>(next.ptr, tail.count+1),
                        std::memory_order_release,
                        std::memory_order_relaxed);
            } else {
                item = next.ptr->value;
                if (Head.compare_exchange_weak(head,
                        Pointer<T>(next.ptr, head.count+1),
                        std::memory_order_release,
                        std::memory_order_relaxed)) {
                  delete head.ptr;
                  break;
                }
            }
        }
    }
    count--;
    return true;
}
template <class T>
T& LinkedLockFreeQueue<T>::operator[](unsigned int i)
{
    if (i < 0 || i >= count)
    {
        std::cerr << "Error in array limits: " << i << " is out of range\n";
        std::exit(EXIT_FAILURE);
    }
    Pointer<T> tmp = Head.load(std::memory_order_relaxed);
    //Pointer<T> tail = Tail.load(std::memory_order_relaxed);
    while(i > 0) {
        //std::cout << i << ":"  << std::endl;
        tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
        i--;
    }
    tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
    return tmp.ptr->value;
}
#endif
build/include/lock_free_queue.h
New file
@@ -0,0 +1,360 @@
#ifndef __LOCK_FREE_QUEUE_H__
#define __LOCK_FREE_QUEUE_H__
#include <usg_common.h>
#include <assert.h> // assert()
#include "mem_pool.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "shm_allocator.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
// define this macro if calls to "size" must return the real size of the
// queue. If it is undefined  that function will try to take a snapshot of
// the queue, but returned value might be bogus
// forward declarations for default template values
//
template <typename ELEM_T, typename Allocator>
class ArrayLockFreeQueue;
// template <typename ELEM_T>
// class LinkedLockFreeQueue;
/// @brief Lock-free queue based on a circular array
/// No allocation of extra memory for the nodes handling is needed, but it has
/// to add extra overhead (extra CAS operation) when inserting to ensure the
/// thread-safety of the queue when the queue type is not
/// ArrayLockFreeQueueSingleProducer.
///
/// examples of instantiation:
///   ArrayLockFreeQueue<int> q; // queue of ints of default size (65535 - 1)
///                              // and defaulted to single producer
///   ArrayLockFreeQueue<int, 16> q;
///                              // queue of ints of size (16 - 1) and
///                              // defaulted to single producer
///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
///                              // queue of ints of size (100 - 1) with support
///                              // for multiple producers
///
/// ELEM_T represents the type of elementes pushed and popped from the queue
/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
///        This number should be a power of 2 to ensure
///        indexes in the circular queue keep stable when the uint32_t
///        variable that holds the current position rolls over from FFFFFFFF
///        to 0. For instance
///        2    -> 0x02
///        4    -> 0x04
///        8    -> 0x08
///        16   -> 0x10
///        (...)
///        1024 -> 0x400
///        2048 -> 0x800
///
///        if queue size is not defined as requested, let's say, for
///        instance 100, when current position is FFFFFFFF (4,294,967,295)
///        index in the circular array is 4,294,967,295 % 100 = 95.
///        When that value is incremented it will be set to 0, that is the
///        last 4 elements of the queue are not used when the counter rolls
///        over to 0
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
///        ArrayLockFreeQueue are supported (single producer
///        by default)
template <
    typename ELEM_T,
    typename Allocator = SHM_Allocator,
    template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
     >
class LockFreeQueue
{
private:
    int slots;
    int items;
public:
    // int mutex;
    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    /// @brief destructor of the class.
    /// Note it is not virtual since it is not expected to inherit from this
    /// template
    ~LockFreeQueue();
    std::atomic_uint reference;
    /// @brief constructor of the class
    /// @brief returns the current number of items in the queue
    /// It tries to take a snapshot of the size of the queue, but in busy environments
    /// this function might return bogus values.
    ///
    /// If a reliable queue size must be kept you might want to have a look at
    /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
    /// it enables a reliable size though it hits overall performance of the queue
    /// (when the reliable size variable is on it's got an impact of about 20% in time)
    inline uint32_t size();
    /// @brief return true if the queue is full. False otherwise
    /// It tries to take a snapshot of the size of the queue, but in busy
    /// environments this function might return bogus values. See help in method
    /// LockFreeQueue::size
    inline bool full();
    inline bool empty();
    inline ELEM_T& operator[](unsigned i);
    /// @brief push an element at the tail of the queue
    /// @param the element to insert in the queue
    /// Note that the element is not a pointer or a reference, so if you are using large data
    /// structures to be inserted in the queue you should think of instantiate the template
    /// of the queue as a pointer to that large structure
    /// @return true if the element was inserted in the queue. False if the queue was full
    bool push(const ELEM_T &a_data);
    bool push_nowait(const ELEM_T &a_data);
    bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
    /// @brief pop the element at the head of the queue
    /// @param a reference where the element in the head of the queue will be saved to
    /// Note that the a_data parameter might contain rubbish if the function returns false
    /// @return true if the element was successfully extracted from the queue. False if the queue was empty
    bool pop(ELEM_T &a_data);
    bool pop_nowait(ELEM_T &a_data);
    bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
    void *operator new(size_t size);
    void operator delete(void *p);
protected:
    /// @brief the actual queue. methods are forwarded into the real
    ///        implementation
    Q_TYPE<ELEM_T, Allocator> m_qImpl;
private:
    /// @brief disable copy constructor declaring it private
    LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
};
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
{
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
    slots = SemUtil::get(IPC_PRIVATE, qsize);
    items = SemUtil::get(IPC_PRIVATE, 0);
    // mutex = SemUtil::get(IPC_PRIVATE, 1);
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
{
    LoggerFactory::getLogger().debug("LockFreeQueue desctroy");
    SemUtil::remove(slots);
    SemUtil::remove(items);
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
{
    return m_qImpl.size();
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
{
    return m_qImpl.full();
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
{
    return m_qImpl.empty();
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
    if (SemUtil::dec(slots) == -1) {
        err_msg(errno, "LockFreeQueue push");
        return false;
    }
    if ( m_qImpl.push(a_data) ) {
        SemUtil::inc(items);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (SemUtil::dec_nowait(slots) == -1) {
        if (errno == EAGAIN)
            return false;
        else {
            err_msg(errno, "LockFreeQueue push_nowait");
            return false;
        }
    }
    if ( m_qImpl.push(a_data)) {
        SemUtil::inc(items);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
{
    if (SemUtil::dec_timeout(slots, timeout) == -1) {
        if (errno == EAGAIN)
            return false;
        else {
            err_msg(errno, "LockFreeQueue push_timeout");
            return false;
        }
    }
    if (m_qImpl.push(a_data)){
        SemUtil::inc(items);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
    if (SemUtil::dec(items) == -1) {
        err_msg(errno, "LockFreeQueue pop");
        return false;
    }
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (SemUtil::dec_nowait(items) == -1) {
        if (errno == EAGAIN)
            return false;
        else {
            err_msg(errno, "LockFreeQueue pop_nowait");
            return false;
        }
    }
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
{
    if (SemUtil::dec_timeout(items, timeout) == -1) {
        if (errno == EAGAIN)
            return false;
        else {
            err_msg(errno, "LockFreeQueue pop_timeout");
            return false;
        }
    }
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);
        return true;
    }
    return false;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
    return m_qImpl.operator[](i);
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
        return Allocator::allocate(size);
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
    return Allocator::deallocate(p);
}
// include implementation files
//#include "linked_lock_free_queue.h"
#include "array_lock_free_queue.h"
#endif // _LOCK_FREE_QUEUE_H__
build/include/logger_factory.h
New file
@@ -0,0 +1,17 @@
#ifndef __LOGGER_FACTORY_H__
#define __LOGGER_FACTORY_H__
#include "logger.h"
class LoggerFactory {
public:
    static Logger getLogger() {
//ERROR ALL DEBUG
        static Logger logger(Logger::DEBUG);
        return logger;
    }
};
#endif
build/include/mem_pool.h
New file
@@ -0,0 +1,55 @@
#ifndef _MEM_POOL_H_
#define _MEM_POOL_H_
#include "mm.h"
#include "sem_util.h"
#define MEM_POOL_COND_KEY 0x8801
static int mem_pool_cond  = SemUtil::get(MEM_POOL_COND_KEY, 0);
// static int mem_pool_mutex  = SemUtil::get(MEM_POOL_COND_KEY, 1);
static inline void mem_pool_init(size_t heap_size) {
    if(mm_init(heap_size)) {
    }
}
static inline void mem_pool_destroy(void) {
    if(mm_destroy()) {
        SemUtil::remove(mem_pool_cond);
    }
}
static inline void *mem_pool_malloc (size_t size) {
    void *ptr;
    while( (ptr = mm_malloc(size)) == NULL ) {
        err_msg(0, "There is not enough memery to allocate, waiting someone else to free.");
        SemUtil::set(mem_pool_cond, 0);
        // wait for someone else to free space
        SemUtil::dec(mem_pool_cond);
    }
    return ptr;
}
static inline void mem_pool_free (void *ptr) {
    mm_free(ptr);
    // notify malloc
    SemUtil::set(mem_pool_cond, 1);
}
static inline void *mem_pool_realloc (void *ptr, size_t size) {
    return mm_realloc(ptr, size);
}
static inline hashtable_t * mem_pool_get_hashtable() {
    return mm_get_hashtable();
}
// extern int mm_checkheap(int verbose);
#endif
build/include/mm.h
New file
@@ -0,0 +1,20 @@
#ifndef MM_HDR_H
#define MM_HDR_H      /* Prevent accidental double inclusion */
#include <usg_common.h>
#include "usg_typedef.h"
#include "hashtable.h"
extern bool mm_init(size_t heap_size);
extern bool mm_destroy(void);
extern void *mm_malloc (size_t size);
extern void mm_free (void *ptr);
extern void *mm_realloc(void *ptr, size_t size);
extern hashtable_t * mm_get_hashtable();
// extern int mm_checkheap(int verbose);
// extern void *get_mm_start_brk();
// extern size_t get_mm_max_size();
#endif
build/include/mod_socket.h
New file
@@ -0,0 +1,76 @@
#ifndef __MOD_SOCKET_H__
#define __MOD_SOCKET_H__
#ifdef __cplusplus
extern "C" {
#endif
enum socket_mod_t
{
    PULL_PUSH = 1,
    REQ_REP = 2,
    PAIR = 3,
    PUB_SUB = 4,
    SURVEY = 5,
    BUS = 6
};
/**
 * 创建socket
 * @return socket地址
*/
void *mod_open_socket(int mod);
/**
 * 关闭socket
*/
int mod_close_socket(void * _socket);
/**
 * 绑定端口到socket, 如果不绑定则系统自动分配一个
 * @return 0 成功, 其他值 失败的错误码
*/
int mod_socket_bind(void * _socket, int port);
/**
 * 服务端开启连接监听
 * @return 0 成功, 其他值 失败的错误码
 */
int mod_listen(void * _socket);
/**
 * 客户端发起连接请求
 */
int mod_connect(void * _socket, int port);
/**
 * 发送信息
 * @return 0 成功, 其他值 失败的错误码
 */
int mod_send(void * _socket, const void *buf, const int size);
/**
 * 接收信息
 * @return 0 成功, 其他值 失败的错误码
*/
int mod_recv(void * _socket, void **buf, int *size) ;
/**
 * 释放接收信息的buf
 */
void mod_free(void *buf);
/**
 * 获取soket端口号
 */
int mod_get_socket_port(void * _socket);
#ifdef __cplusplus
}
#endif
#endif
build/include/sem_util.h
copy from src/util/sem_util.h copy to build/include/sem_util.h
build/include/shm_allocator.h
New file
@@ -0,0 +1,102 @@
#ifndef __SHM_ALLOCATOR_H__
#define __SHM_ALLOCATOR_H__
#include "usg_common.h"
#include "mem_pool.h"
#include <new>
#include <cstdlib> // for exit()
#include <climits> // for UNIX_MAX
#include <cstddef>
template<class T> class SHM_STL_Allocator
{
public:
  typedef T               value_type;
  typedef T*              pointer;
  typedef const T*        const_pointer;
  typedef T&              reference;
  typedef const T&        const_reference;
  typedef size_t          size_type;
  typedef ptrdiff_t       difference_type;
  SHM_STL_Allocator() {};
  ~SHM_STL_Allocator() {};
  template<class U> SHM_STL_Allocator(const SHM_STL_Allocator<U>& t) { };
  template<class U> struct rebind { typedef SHM_STL_Allocator<U> other; };
  pointer allocate(size_type n, const void* hint=0) {
//        fprintf(stderr, "allocate n=%u, hint= %p\n",n, hint);
    return((T*) (mm_malloc(n * sizeof(T))));
  }
  void deallocate(pointer p, size_type n) {
//        fprintf(stderr, "dealocate n=%u" ,n);
    mm_free((void*)p);
  }
  void construct(pointer p, const T& value) {
    ::new(p) T(value);
  }
  void construct(pointer p)
  {
    ::new(p) T();
  }
  void destroy(pointer p) {
    p->~T();
  }
  pointer address(reference x) {
    return (pointer)&x;
  }
  const_pointer address(const_reference x) {
    return (const_pointer)&x;
  }
  size_type max_size() const {
    return size_type(UINT_MAX/sizeof(T));
  }
};
class SHM_Allocator {
  public:
    static void *allocate (size_t size) {
       printf("shm_allocator malloc\n");
      return mem_pool_malloc(size);
    }
    static void deallocate (void *ptr) {
      printf("shm_allocator free\n");
      return mem_pool_free(ptr);
    }
};
class DM_Allocator {
  public:
    static void *allocate (size_t size) {
      printf("dm_allocator malloc\n");
      return malloc(size);
    }
    static void deallocate (void *ptr) {
      printf("dm_allocator free\n");
      return free(ptr);
    }
};
// template<class charT, class traits = char _traits<charT>,
// class Allocator = allocator<charT> >
typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > shmstring;
#endif
build/include/shm_mm.h
New file
@@ -0,0 +1,26 @@
#ifndef __SHM_MM_H__
#define __SHM_MM_H__
#ifdef __cplusplus
extern "C" {
#endif
/**
 * 初始化共享内存
 * @size 共享内存大小, 单位M
 *
 */
void shm_init(int size);
/**
 * 销毁共享内存
 * 整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。
 */
void shm_destroy();
#ifdef __cplusplus
}
#endif
#endif
build/include/shm_queue.h
New file
@@ -0,0 +1,184 @@
#ifndef __SHM_QUEUE_H__
#define __SHM_QUEUE_H__
#include "usg_common.h"
#include "hashtable.h"
#include "lock_free_queue.h"
#include "logger_factory.h"
#include "shm_allocator.h"
// default Queue size
// #define LOCK_FREE_Q_DEFAULT_SIZE 16
template < typename ELEM_T>
class SHMQueue
{
private:
    const int KEY;
public:
    /// @brief constructor of the class
    SHMQueue(int key=0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    ~SHMQueue();
    inline uint32_t size();
    inline bool full();
    inline bool empty();
    inline bool push(const ELEM_T &a_data);
    inline bool push_nowait(const ELEM_T &a_data);
    inline bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
    inline bool pop(ELEM_T &a_data);
    inline bool pop_nowait(ELEM_T &a_data);
    inline bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
    inline ELEM_T& operator[](unsigned i);
    static void remove_queues_exclude(int *keys, size_t length);
private:
protected:
    /// @brief the actual queue-> methods are forwarded into the real
    ///        implementation
    LockFreeQueue<ELEM_T, SHM_Allocator>* queue;
private:
    /// @brief disable copy constructor declaring it private
    SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src);
};
template < typename ELEM_T >
void SHMQueue<ELEM_T>::remove_queues_exclude(int *keys, size_t length)
{
    hashtable_t *hashtable = mm_get_hashtable();
    std::set<int>* keyset = hashtable_keyset(hashtable);
    std::set<int>::iterator keyItr;
     LockFreeQueue<ELEM_T, SHM_Allocator>* mqueue;
    bool found;
    for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
        found = false;
        for(size_t i = 0; i < length; i++) {
            if(*keyItr == keys[i]) {
                found = true;
                break;
            }
        }
        if(!found) {
           mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
           delete mqueue;
        }
    }
    delete keyset;
}
template < typename ELEM_T >
SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize): KEY(key)
{
    hashtable_t *hashtable = mm_get_hashtable();
    queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
    //LockFreeQueue<int, 10000> q;
    if (queue == NULL || (void *)queue == (void *)1) {
        queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
        hashtable_put(hashtable,  key, (void *)queue);
    }
    queue->reference++;
    LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load());
}
template < typename ELEM_T >
SHMQueue<ELEM_T>::~SHMQueue()
{
    queue->reference--;
    LoggerFactory::getLogger().debug("SHMQueue destructor  reference===%d", queue->reference.load());
    if(queue->reference.load() == 0) {
        delete queue;
        hashtable_t *hashtable = mm_get_hashtable();
        hashtable_remove(hashtable, KEY);
        LoggerFactory::getLogger().debug("SHMQueue destructor delete queue");
    }
}
template < typename ELEM_T >
inline uint32_t SHMQueue<ELEM_T>::size()
{
    return queue->size();
}
template < typename ELEM_T >
inline bool SHMQueue<ELEM_T>::full()
{
    return queue->full();
}
template < typename ELEM_T >
inline bool SHMQueue<ELEM_T>::empty()
{
    return queue->empty();
}
template < typename ELEM_T >
inline bool SHMQueue<ELEM_T>::push(const ELEM_T &a_data)
{
   return queue->push(a_data);
}
template <
    typename ELEM_T >
inline bool SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data)
{
   return queue->push_nowait(a_data);
}
template < typename ELEM_T >
inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
{
    return queue->push_timeout(a_data, timeout);
}
template < typename ELEM_T >
inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data)
{
   return queue->pop(a_data);
}
template < typename ELEM_T >
inline bool SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data)
{
    return queue->pop_nowait(a_data);
}
template < typename ELEM_T >
inline bool SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
{
   return queue->pop_timeout(a_data, timeout);
}
template < typename ELEM_T >
inline ELEM_T& SHMQueue<ELEM_T>::operator[](unsigned i) {
     return queue->operator[](i);
}
#endif
build/include/shm_queue_wrapper.h
New file
@@ -0,0 +1,100 @@
#ifndef __SHM_QUEUE_WRAPPER_H__
#define __SHM_QUEUE_WRAPPER_H__
#include "usg_common.h"
#include "usg_typedef.h"
#ifdef __cplusplus
extern "C" {
#endif
//移除不包含在keys中的队列
void shm_remove_queues_exclude(void *keys, int length);
/**
 * 创建队列
 * @ shmqueue
 * @ key 标识共享队列的唯一标识, key是一个指针里面存储了key的值, 如果key的值为-1系统会自动分配一个key值并把该key的值赋给key指针。如果key的值不会空会检查是否有重复绑定的情况, 有重复就报错没有就创建队列并绑定key.
 * @ queue_size 队列大小
 */
void* shmqueue_create( int * key, int queue_size);
/**
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出
 */
void* shmqueue_attach(int key) ;
/**
 * 销毁
*/
void shmqueue_drop(void * _shmqueue);
/**
 * 队列元素的个数
 */
int shmqueue_size(void * _shmqueue) ;
/**
 * 是否已满
 * @return 1是, 0否
 */
int shmqueue_full(void * _shmqueue);
/**
 * 是否为空
 * @return 1是, 0否
 */
int shmqueue_empty(void * _shmqueue) ;
/**
 * 入队, 队列满时等待.
 * @return 1 入队成功, 0 入队失败
 */
int shmqueue_push(void * _shmqueue, void *src, int size);
/**
 * 入队, 队列满时立即返回.
 * @return 1 入队成功, 0 入队失败
 */
int shmqueue_push_nowait(void * _shmqueue, void *src, int size) ;
/**
 * 入队, 指定时间内入队不成功就返回
 * @sec 秒
 * @nsec 纳秒
 * @return 1 入队成功, 0 入队失败
 */
int shmqueue_push_timeout(void * _shmqueue, void *src, int size,  int sec, int nsec) ;
/**
 * 出队, 队列空时等待
 * @return 1 出队成功, 0出队失败
 */
int shmqueue_pop(void * _shmqueue, void **dest, int *size);
/**
 * 出队, 队列空时立即返回
 * @return 1 出队成功, 0出队失败
 */
int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) ;
/**
 * 出队, 指定时间内出队不成功就返回
 * @sec秒
 * @nsec纳秒
 * @return 1 出队成功, 0出队失败
 */
int shmqueue_pop_timeout(void * _shmqueue, void **dest, int *size, int sec, int nsec);
/**
 * 释放出队分配的内存
 */
void shmqueue_free(void *ptr);
#ifdef __cplusplus
}
#endif
#endif
build/include/shm_socket.h
New file
@@ -0,0 +1,87 @@
#ifndef __SHM_SOCKET_H__
#define __SHM_SOCKET_H__
#include "usg_common.h"
#include "usg_typedef.h"
#include "shm_queue.h"
#ifdef __cplusplus
extern "C" {
#endif
enum shm_msg_type_t
{
    SHM_SOCKET_OPEN = 1,
    SHM_SOCKET_OPEN_REPLY = 2,
    SHM_SOCKET_CLOSE = 3,
    SHM_COMMON_MSG = 4
};
enum shm_socket_type_t
{
    SHM_SOCKET_STREAM = 1,
    SHM_SOCKET_DGRAM = 2
};
enum shm_connection_status_t {
    SHM_CONN_CLOSED=1,
    SHM_CONN_LISTEN=2,
    SHM_CONN_ESTABLISHED=3
};
typedef struct shm_msg_t {
    int port;
    shm_msg_type_t type;
    size_t size;
    void * buf;
} shm_msg_t;
typedef struct shm_socket_t {
    shm_socket_type_t socket_type;
    // 本地port
    int port;
    shm_connection_status_t status;
    SHMQueue<shm_msg_t> *queue;
    SHMQueue<shm_msg_t> *remoteQueue;
    LockFreeQueue<shm_msg_t, DM_Allocator> *messageQueue;
    LockFreeQueue<shm_msg_t, DM_Allocator> *acceptQueue;
    std::map<int, shm_socket_t* > *clientSocketMap;
    pthread_t dispatch_thread;
} shm_socket_t;
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type);
int shm_close_socket(shm_socket_t * socket) ;
int shm_socket_bind(shm_socket_t * socket, int port) ;
int shm_listen(shm_socket_t * socket) ;
shm_socket_t* shm_accept(shm_socket_t* socket);
int shm_connect(shm_socket_t * socket, int port);
int shm_send(shm_socket_t * socket, const void *buf, const int size) ;
int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port);
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port);
#ifdef __cplusplus
}
#endif
#endif
build/lib/libshm_queue.a
Binary files differ
demo/Makefile
@@ -2,19 +2,19 @@
# Makefile for common library.
#
ROOT=..
LDLIBS+=-Wl,-rpath=$(ROOT)/queue:$(ROOT)/lib
LDLIBS+=-Wl,-rpath=$(ROOT)/lib:$(ROOT)/build/lib
# 开源工具包路径
LDDIR += -L$(ROOT)/queue
LDDIR += -L$(ROOT)/build/lib
# 开源工具包
LDLIBS += -lshm_queue -lusgcommon -lpthread
INCLUDE += -I$(ROOT)/queue/ -I$(ROOT)/queue/include
INCLUDE += -I$(ROOT)/build/include
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
PROGS =    req_rep pub_sub
PROGS =    req_rep pub_sub queue
build: $(PROGS)
@@ -25,8 +25,3 @@
  
clean:
    rm -f $(TEMPFILES) $(PROGS)
$(LIBQUEUE):
    (cd $(ROOT)/queue && $(MAKE))
demo/pub_sub
Binary files differ
demo/queue
Binary files differ
demo/queue.c
File was renamed from test2/test_queue_wrapper.c
@@ -1,5 +1,5 @@
#include "shm_queue_wrapper.h"
#include "mm.h"
#include "shm_mm.h"
// typedef struct message_t
// {
@@ -22,7 +22,7 @@
    for(i = 0; i < qsize; i++) {
        sprintf(msg, "%d hello", i); 
        //入队
        if(shmqueue_push(queue, (void *)msg, sizeof(msg))) {
        if(shmqueue_push(queue, (void *)msg, strlen(msg) + 1)) {
              printf("push: %s\n", msg );
        }
    }
demo/req_rep
Binary files differ
src/Makefile
@@ -23,14 +23,16 @@
MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE)
PREFIX = $(ROOT)/build
ifeq ($(DEBUG),y)
  MYLIBS = $(LIBSQUEUE)
else
  MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE)
endif
all: build
all: install
build: $(MYLIBS)
@@ -55,7 +57,7 @@
    install -d $(PREFIX)/lib/
    install -m 644 $^ $(PREFIX)/lib/
    install -d $(PREFIX)/include/
    install -m 644 $(MINCLUDE)/* $(PREFIX)/include/
    install -m 644 ./*.h ./queue/include/* ./socket/include/* ./util/include/* $(PREFIX)/include/
clean:
    rm -f $(TEMPFILES)
src/libshm_queue.a
Binary files differ
src/logger_factory.h
@@ -6,8 +6,8 @@
public:
    static Logger getLogger() {
//ERROR ALL
        static Logger logger(Logger::ERROR);
//ERROR ALL DEBUG
        static Logger logger(Logger::DEBUG);
        return logger;
    }
};
src/queue/include/shm_allocator.h
@@ -1,7 +1,7 @@
#ifndef __SHM_ALLOCATOR_H__
#define __SHM_ALLOCATOR_H__
#include "usg_common.h"
#include "mm.h"
#include "mem_pool.h"
#include <new>
#include <cstdlib> // for exit()
#include <climits> // for UNIX_MAX
@@ -67,12 +67,12 @@
  public:
    static void *allocate (size_t size) {
       printf("shm_allocator malloc\n");
      return mm_malloc(size);
      return mem_pool_malloc(size);
    }
    static void deallocate (void *ptr) {
      printf("shm_allocator free\n");
      return mm_free(ptr);
      return mem_pool_free(ptr);
    }
};
src/queue/include/shm_queue.h
@@ -2,7 +2,6 @@
#define __SHM_QUEUE_H__
#include "usg_common.h"
#include "mm.h"
#include "hashtable.h"
#include "lock_free_queue.h"
#include "logger_factory.h"
src/queue/include/shm_queue_wrapper.h
@@ -3,8 +3,7 @@
#include "usg_common.h"
#include "usg_typedef.h"
#include "shm_queue.h"
#include "shm_allocator.h"
#ifdef __cplusplus
extern "C" {
src/queue/libshm_queue.a
Binary files differ
src/queue/libshm_queue.so
Binary files differ
src/queue/shm_queue_wrapper.c
@@ -2,6 +2,8 @@
#include "mem_pool.h"
#include "hashtable.h"
#include "shm_queue.h"
#include "shm_allocator.h"
typedef struct ele_t {
    size_t size;
src/socket/include/shm_socket.h
@@ -4,11 +4,6 @@
#include "usg_common.h"
#include "usg_typedef.h"
#include "shm_queue.h"
#include "shm_allocator.h"
#include "mem_pool.h"
#include "hashtable.h"
#include "sem_util.h"
#ifdef __cplusplus
extern "C" {
@@ -20,6 +15,13 @@
    SHM_SOCKET_OPEN_REPLY = 2,
    SHM_SOCKET_CLOSE = 3,
    SHM_COMMON_MSG = 4
};
enum shm_socket_type_t
{
    SHM_SOCKET_STREAM = 1,
    SHM_SOCKET_DGRAM = 2
    
};
@@ -39,6 +41,7 @@
typedef struct shm_socket_t {
    shm_socket_type_t socket_type;
    // 本地port
    int port;
    shm_connection_status_t status;
@@ -54,7 +57,7 @@
shm_socket_t *shm_open_socket();
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type);
int shm_close_socket(shm_socket_t * socket) ;
@@ -72,6 +75,9 @@
int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port);
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port);
#ifdef __cplusplus
src/socket/mod_socket.c
@@ -1,7 +1,12 @@
#include "usg_common.h"
#include "mod_socket.h"
#include "shm_socket.h"
#include "usg_common.h"
#include "shm_allocator.h"
#include "mem_pool.h"
#include "hashtable.h"
#include "sem_util.h"
#include "logger_factory.h"
static Logger logger = LoggerFactory::getLogger();
typedef struct mod_entry_t
@@ -28,7 +33,7 @@
 */
void *mod_open_socket(int mod) {
  mod_socket_t *socket = (mod_socket_t *)malloc(sizeof(mod_socket_t));
  socket->shm_socket=shm_open_socket();
  socket->shm_socket=shm_open_socket(SHM_SOCKET_STREAM);
  socket->is_server = 0;
  socket->mod = (socket_mod_t)mod;
  socket->recvQueue = new LockFreeQueue<mod_entry_t, DM_Allocator>(16);
src/socket/shm_socket.c
@@ -1,4 +1,5 @@
#include "shm_socket.h"
#include "hashtable.h"
#include "logger_factory.h"
#include <map>
@@ -14,11 +15,16 @@
void * _client_run_msg_rev(void* _socket);
int _shm_close_dgram_socket(shm_socket_t *socket);
int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote);
SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ;
shm_socket_t *shm_open_socket() {
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
    shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
    socket->socket_type = socket_type;
    socket->port = -1;
    socket->dispatch_thread = 0;
    socket->status=SHM_CONN_CLOSED;
@@ -26,8 +32,381 @@
    return socket;
}
int shm_close_socket(shm_socket_t *socket) {
    switch(socket->socket_type) {
        case     SHM_SOCKET_STREAM:
            return _shm_close_stream_socket(socket, true);
        case SHM_SOCKET_DGRAM:
            return _shm_close_dgram_socket(socket);
        default:
            return -1;
    }
    return -1;
}
int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) {
int shm_socket_bind(shm_socket_t * socket, int port) {
    socket -> port = port;
    return 0;
}
int shm_listen(shm_socket_t* socket) {
    if(socket->socket_type != SHM_SOCKET_STREAM) {
        err_exit(0, "can not invoke shm_listen method with a socket which is not a SHM_SOCKET_STREAM socket");
    }
    int  port;
    hashtable_t *hashtable = mm_get_hashtable();
    if(socket -> port == -1) {
        port = hashtable_alloc_key(hashtable);
        socket -> port = port;
    } else {
        if(hashtable_get(hashtable, socket->port)!= NULL) {
            err_exit(0, "key %d has already been in used!", socket->port);
        }
    }
    socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
    socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
    socket->clientSocketMap = new std::map<int, shm_socket_t* >;
    socket->status = SHM_CONN_LISTEN;
    pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket);
    return 0;
}
/**
 * 接受客户端建立新连接的请求
 *
*/
shm_socket_t* shm_accept(shm_socket_t* socket) {
    if(socket->socket_type != SHM_SOCKET_STREAM) {
        err_exit(0, "can not invoke shm_accept method with a socket which is not a SHM_SOCKET_STREAM socket");
    }
    hashtable_t *hashtable = mm_get_hashtable();
    int client_port;
    shm_socket_t *client_socket;
    shm_msg_t src;
    if (socket->acceptQueue->pop(src) ) {
// print_msg("===accept:", src);
        client_port = src.port;
        client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
        client_socket->port = socket->port;
        // client_socket->queue= socket->queue;
        //初始化消息queue
        client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
        //连接到对方queue
        client_socket->remoteQueue = _attach_remote_queue(client_port);
        socket->clientSocketMap->insert({client_port, client_socket});
        /*
         * shm_accept 用户执行的方法 与_server_run_msg_rev在两个不同的限制工作,accept要保证在客户的发送消息之前完成资源的准备工作,以避免出现竞态问题
        */
        //发送open_reply,回应客户端的connect请求
        struct timespec timeout = {1, 0};
        shm_msg_t msg;
        msg.port = socket->port;
        msg.size = 0;
        msg.type = SHM_SOCKET_OPEN_REPLY;
        if (client_socket->remoteQueue->push_timeout(msg, &timeout) )
        {
            client_socket->status = SHM_CONN_ESTABLISHED;
            return client_socket;
        } else {
            err_msg(0, "shm_accept: 发送open_reply失败");
            return NULL;
        }
    } else {
        err_exit(errno, "shm_accept");
    }
    return NULL;
}
int shm_connect(shm_socket_t* socket, int port) {
    if(socket->socket_type != SHM_SOCKET_STREAM) {
        err_exit(0, "can not invoke shm_connect method with a socket which is not a SHM_SOCKET_STREAM socket");
    }
    hashtable_t *hashtable = mm_get_hashtable();
    if(hashtable_get(hashtable, port)== NULL) {
        err_exit(0, "shm_connect:connect at port %d  failed!", port);
    }
    if(socket->port == -1) {
        socket->port = hashtable_alloc_key(hashtable);
    } else {
        if(hashtable_get(hashtable, socket->port)!= NULL) {
            err_exit(0, "key %d has already been in used!", socket->port);
        }
    }
    socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
    socket->remoteQueue = _attach_remote_queue(port);
    socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
    //发送open请求
    struct timespec timeout = {1, 0};
    shm_msg_t msg;
    msg.port = socket->port;
    msg.size = 0;
    msg.type=SHM_SOCKET_OPEN;
    socket->remoteQueue->push_timeout(msg, &timeout);
    //接受open reply
    if(socket->queue->pop(msg)) {
        // 在这里server端已经准备好接受客户端发送请求了,完成与服务端的连接
        if(msg.type == SHM_SOCKET_OPEN_REPLY) {
            socket->status = SHM_CONN_ESTABLISHED;
            pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
        } else {
            err_exit(0, "shm_connect: 不匹配的应答信息!");
        }
    } else {
        err_exit(0, "connect failted!");
    }
    return 0;
}
int shm_send(shm_socket_t *socket, const void *buf, const int size) {
    if(socket->socket_type != SHM_SOCKET_STREAM) {
        err_exit(0, "can not invoke shm_send method with a socket which is not a SHM_SOCKET_STREAM socket");
    }
    // hashtable_t *hashtable = mm_get_hashtable();
    // if(socket->remoteQueue == NULL) {
    //     err_msg(errno, "当前客户端无连接!");
    //     return -1;
    // }
    shm_msg_t dest;
    dest.type=SHM_COMMON_MSG;
    dest.port = socket->port;
    dest.size = size;
    dest.buf = mm_malloc(size);
    memcpy(dest.buf, buf, size);
    if(socket->remoteQueue->push(dest)) {
        return 0;
    } else {
        err_msg(errno, "connection has been closed!");
        return -1;
    }
}
int shm_recv(shm_socket_t* socket, void **buf, int *size) {
    if(socket->socket_type != SHM_SOCKET_STREAM) {
        err_exit(0, "can not invoke shm_recv method with a socket which is not a SHM_SOCKET_STREAM socket");
    }
    shm_msg_t src;
    if (socket->messageQueue->pop(src)) {
        void * _buf = malloc(src.size);
        memcpy(_buf, src.buf, src.size);
        *buf = _buf;
        *size = src.size;
        mm_free(src.buf);
        return 0;
    } else {
        return -1;
    }
}
// 短连接方式发送
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port) {
    hashtable_t *hashtable = mm_get_hashtable();
    if(socket->queue == NULL) {
        if(socket->port == -1) {
            socket->port = hashtable_alloc_key(hashtable);
        } else {
            if(hashtable_get(hashtable, socket->port)!= NULL) {
                err_exit(0, "key %d has already been in used!", socket->port);
            }
        }
        socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
    }
    if (port == socket->port) {
        err_msg(0, "can not send to your self!");
        return -1;
    }
    shm_msg_t dest;
    dest.type=SHM_COMMON_MSG;
    dest.port = socket->port;
    dest.size = size;
    dest.buf = mm_malloc(size);
    memcpy(dest.buf, buf, size);
    SHMQueue<shm_msg_t> *remoteQueue =  _attach_remote_queue(port);
    if(remoteQueue->push(dest)) {
        delete remoteQueue;
        return 0;
    } else {
        delete remoteQueue;
        err_msg(errno, "sendto port %d failed!", port);
        return -1;
    }
}
// 短连接方式接受
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port){
    hashtable_t *hashtable = mm_get_hashtable();
    if(socket->queue == NULL) {
        if(socket->port == -1) {
            socket->port = hashtable_alloc_key(hashtable);
        } else {
            if(hashtable_get(hashtable, socket->port)!= NULL) {
                err_exit(0, "key %d has already been in used!", socket->port);
            }
        }
        socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
    }
    shm_msg_t src;
//logger.debug("shm_recvfrom pop before");
    if (socket->queue->pop(src)) {
        void * _buf = malloc(src.size);
        memcpy(_buf, src.buf, src.size);
        *buf = _buf;
        *size = src.size;
        *port = src.port;
        mm_free(src.buf);
//logger.debug("shm_recvfrom pop after");
        return 0;
    } else {
        return -1;
    }
}
/**
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出
 */
SHMQueue<shm_msg_t> * _attach_remote_queue(int port) {
    hashtable_t *hashtable = mm_get_hashtable();
    if(hashtable_get(hashtable, port)== NULL) {
        err_exit(0, "_remote_queue_attach:connet at port %d  failed!", port);
        return NULL;
    }
    SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0);
    return queue;
}
void _server_close_conn_to_client(shm_socket_t* socket, int port) {
    shm_socket_t *client_socket;
    auto iter = socket->clientSocketMap->find(port);
    if( iter !=  socket->clientSocketMap->end() ) {
        socket->clientSocketMap->erase(iter);
    }
    //free((void *)client_socket);
}
/**
 * server端各种类型消息()在这里进程分拣
 */
void * _server_run_msg_rev(void* _socket) {
    pthread_detach(pthread_self());
    shm_socket_t* socket = (shm_socket_t*) _socket;
    struct timespec timeout = {1, 0};
    shm_msg_t src;
    shm_socket_t *client_socket;
    std::map<int, shm_socket_t* >::iterator iter;
    while(socket->queue->pop(src)) {
        switch (src.type) {
            case SHM_SOCKET_OPEN :
                socket->acceptQueue->push_timeout(src, &timeout);
                break;
            case SHM_SOCKET_CLOSE :
                _server_close_conn_to_client(socket, src.port);
                break;
            case SHM_COMMON_MSG :
                iter = socket->clientSocketMap->find(src.port);
                if( iter !=  socket->clientSocketMap->end()) {
                    client_socket= iter->second;
    // print_msg("_server_run_msg_rev push before", src);
                    client_socket->messageQueue->push_timeout(src, &timeout);
    // print_msg("_server_run_msg_rev push after", src);
                }
                break;
            default:
                err_msg(0, "socket.__shm_rev__: undefined message type.");
        }
    }
    return NULL;
}
void _client_close_conn_to_server(shm_socket_t* socket) {
    _shm_close_stream_socket(socket, false);
}
/**
 * client端的各种类型消息()在这里进程分拣
 */
void * _client_run_msg_rev(void* _socket) {
    pthread_detach(pthread_self());
    shm_socket_t* socket = (shm_socket_t*) _socket;
    struct timespec timeout = {1, 0};
    shm_msg_t src;
    while(socket->queue->pop(src)) {
        switch (src.type) {
            case SHM_SOCKET_CLOSE :
                _client_close_conn_to_server(socket);
                break;
            case SHM_COMMON_MSG :
                socket->messageQueue->push_timeout(src, &timeout);
                break;
            default:
                err_msg(0, "socket.__shm_rev__: undefined message type.");
        }
    }
    return NULL;
}
int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote) {
    socket->status = SHM_CONN_CLOSED;
    //给对方发送一个关闭连接的消息
    struct timespec timeout = {1, 0};
@@ -86,295 +465,14 @@
}
int shm_close_socket(shm_socket_t *socket) {
    return _shm_close_socket(socket, true);
}
int shm_socket_bind(shm_socket_t * socket, int port) {
    shm_socket_t * _socket = (shm_socket_t *) socket;
    _socket -> port = port;
int _shm_close_dgram_socket(shm_socket_t *socket){
    if(socket->queue != NULL) {
        delete socket->queue;
        socket->queue = NULL;
    }
    free(socket);
    return 0;
}
int shm_listen(shm_socket_t* socket) {
    int  port;
    hashtable_t *hashtable = mm_get_hashtable();
    if(socket -> port == -1) {
        port = hashtable_alloc_key(hashtable);
        socket -> port = port;
    } else {
        if(hashtable_get(hashtable, socket->port)!= NULL) {
            err_exit(0, "key %d has already been in used!", socket->port);
        }
    }
    socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
    socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
    socket->clientSocketMap = new std::map<int, shm_socket_t* >;
    socket->status = SHM_CONN_LISTEN;
    pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket);
    return 0;
}
void _server_close_conn_to_client(shm_socket_t* socket, int port) {
    shm_socket_t *client_socket;
    auto iter = socket->clientSocketMap->find(port);
    if( iter !=  socket->clientSocketMap->end() ) {
        socket->clientSocketMap->erase(iter);
    }
    //free((void *)client_socket);
}
/**
 * server端各种类型消息()在这里进程分拣
 */
void * _server_run_msg_rev(void* _socket) {
    pthread_detach(pthread_self());
    shm_socket_t* socket = (shm_socket_t*) _socket;
    struct timespec timeout = {1, 0};
    shm_msg_t src;
    shm_socket_t *client_socket;
    std::map<int, shm_socket_t* >::iterator iter;
    while(socket->queue->pop(src)) {
        switch (src.type) {
            case SHM_SOCKET_OPEN :
                socket->acceptQueue->push_timeout(src, &timeout);
                break;
            case SHM_SOCKET_CLOSE :
                _server_close_conn_to_client(socket, src.port);
                break;
            case SHM_COMMON_MSG :
                iter = socket->clientSocketMap->find(src.port);
                if( iter !=  socket->clientSocketMap->end()) {
                    client_socket= iter->second;
    // print_msg("_server_run_msg_rev push before", src);
                    client_socket->messageQueue->push_timeout(src, &timeout);
    // print_msg("_server_run_msg_rev push after", src);
                }
                break;
            default:
                err_msg(0, "socket.__shm_rev__: undefined message type.");
        }
    }
    return NULL;
}
/**
 * 接受客户端建立新连接的请求
 *
*/
shm_socket_t* shm_accept(shm_socket_t* socket) {
    hashtable_t *hashtable = mm_get_hashtable();
    int client_port;
    shm_socket_t *client_socket;
    shm_msg_t src;
    if (socket->acceptQueue->pop(src) ) {
// print_msg("===accept:", src);
        client_port = src.port;
        client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
        client_socket->port = socket->port;
        // client_socket->queue= socket->queue;
        //初始化消息queue
        client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
        //连接到对方queue
        client_socket->remoteQueue = _attach_remote_queue(client_port);
        socket->clientSocketMap->insert({client_port, client_socket});
        /*
         * shm_accept 用户执行的方法 与_server_run_msg_rev在两个不同的限制工作,accept要保证在客户的发送消息之前完成资源的准备工作,以避免出现竞态问题
        */
        //发送open_reply,回应客户端的connect请求
        struct timespec timeout = {1, 0};
        shm_msg_t msg;
        msg.port = socket->port;
        msg.size = 0;
        msg.type = SHM_SOCKET_OPEN_REPLY;
        if (client_socket->remoteQueue->push_timeout(msg, &timeout) )
        {
            client_socket->status = SHM_CONN_ESTABLISHED;
            return client_socket;
        } else {
            err_msg(0, "shm_accept: 发送open_reply失败");
            return NULL;
        }
    } else {
        err_exit(errno, "shm_accept");
    }
    return NULL;
}
int shm_connect(shm_socket_t* socket, int port) {
    hashtable_t *hashtable = mm_get_hashtable();
    if(hashtable_get(hashtable, port)== NULL) {
        err_exit(0, "shm_connect:connect at port %d  failed!", port);
    }
    if(socket->port == -1) {
        socket->port = hashtable_alloc_key(hashtable);
    } else {
        if(hashtable_get(hashtable, socket->port)!= NULL) {
            err_exit(0, "key %d has already been in used!", socket->port);
        }
    }
    socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
    socket->remoteQueue = _attach_remote_queue(port);
    socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
    //发送open请求
    struct timespec timeout = {1, 0};
    shm_msg_t msg;
    msg.port = socket->port;
    msg.size = 0;
    msg.type=SHM_SOCKET_OPEN;
    socket->remoteQueue->push_timeout(msg, &timeout);
    //接受open reply
    if(socket->queue->pop(msg)) {
        // 在这里server端已经准备好接受客户端发送请求了,完成与服务端的连接
        if(msg.type == SHM_SOCKET_OPEN_REPLY) {
            socket->status = SHM_CONN_ESTABLISHED;
            pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
        } else {
            err_exit(0, "shm_connect: 不匹配的应答信息!");
        }
    } else {
        err_exit(0, "connect failted!");
    }
    return 0;
}
void _client_close_conn_to_server(shm_socket_t* socket) {
    _shm_close_socket(socket, false);
}
/**
 * client端的各种类型消息()在这里进程分拣
 */
void * _client_run_msg_rev(void* _socket) {
    pthread_detach(pthread_self());
    shm_socket_t* socket = (shm_socket_t*) _socket;
    struct timespec timeout = {1, 0};
    shm_msg_t src;
    while(socket->queue->pop(src)) {
        switch (src.type) {
            case SHM_SOCKET_CLOSE :
                _client_close_conn_to_server(socket);
                break;
            case SHM_COMMON_MSG :
                socket->messageQueue->push_timeout(src, &timeout);
                break;
            default:
                err_msg(0, "socket.__shm_rev__: undefined message type.");
        }
    }
    return NULL;
}
int shm_send(shm_socket_t *socket, const void *buf, const int size) {
    // hashtable_t *hashtable = mm_get_hashtable();
    // if(socket->remoteQueue == NULL) {
    //     err_msg(errno, "当前客户端无连接!");
    //     return -1;
    // }
    shm_msg_t dest;
    dest.type=SHM_COMMON_MSG;
    dest.port = socket->port;
    dest.size = size;
    dest.buf = mm_malloc(size);
    memcpy(dest.buf, buf, size);
    // struct timeval time;
    // gettimeofday(&time, NULL);
//err_msg(0, "%d %d=======>push befor %d", time.tv_sec, time.tv_usec, socket->port);
    if(socket->remoteQueue->push(dest)) {
        //gettimeofday(&time, NULL);
//err_msg(0, "%d %d=======>push after %d", time.tv_sec, time.tv_usec, socket->port);
        return 0;
    } else {
        err_msg(errno, "connection has been closed!");
        return -1;
    }
}
int shm_recv(shm_socket_t* socket, void **buf, int *size) {
    shm_msg_t src;
//     struct timeval time;
//     gettimeofday(&time, NULL);
// err_msg(0, "%d %d=======>pop befor %d", time.tv_sec, time.tv_usec, socket->port);
    if (socket->messageQueue->pop(src)) {
// gettimeofday(&time, NULL);
// err_msg(0, "%d %d=======>pop after %d", time.tv_sec, time.tv_usec, socket->port);
        void * _buf = malloc(src.size);
        memcpy(_buf, src.buf, src.size);
        *buf = _buf;
        *size = src.size;
        mm_free(src.buf);
        return 0;
    } else {
        return -1;
    }
}
/**
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出
 */
SHMQueue<shm_msg_t> * _attach_remote_queue(int port) {
    hashtable_t *hashtable = mm_get_hashtable();
    if(hashtable_get(hashtable, port)== NULL) {
        err_exit(0, "_remote_queue_attach:connet at port %d  failed!", port);
        return NULL;
    }
    SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0);
    return queue;
}
 
src/util/include/sem_util.h
src/util/sem_util.c
@@ -1,158 +1,148 @@
#include "sem_util.h"
#include "logger_factory.h"
static Logger logger = LoggerFactory::getLogger();
int SemUtil::get(key_t key, unsigned int value) {
    int semid, perms;
    perms = S_IRUSR | S_IWUSR;
  int semid, perms;
    semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms);
  perms = S_IRUSR | S_IWUSR;
    if (semid != -1) {                  /* Successfully created the semaphore */
        union semun arg;
        struct sembuf sop;
  semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms);
        fprintf(stderr, "%ld: created semaphore\n", (long) getpid());
  if (semid != -1) { /* Successfully created the semaphore */
    union semun arg;
    struct sembuf sop;
        arg.val = 0;                    /* So initialize it to 0 */
        if (semctl(semid, 0, SETVAL, arg) == -1)
            err_exit(errno, "semctl 1");
        fprintf(stderr, "%ld: initialized semaphore\n", (long) getpid());
    logger.info("%ld: created semaphore\n", (long)getpid());
        /* Perform a "no-op" semaphore operation - changes sem_otime
           so other processes can see we've initialized the set. */
    arg.val = 0; /* So initialize it to 0 */
    if (semctl(semid, 0, SETVAL, arg) == -1)
      err_exit(errno, "semctl 1");
    logger.info("%ld: initialized semaphore\n", (long)getpid());
        sop.sem_num = 0;                /* Operate on semaphore 0 */
        sop.sem_op = value;
        sop.sem_flg = 0;
        if (semop(semid, &sop, 1) == -1)
            err_exit(errno, "semop");
        fprintf(stderr, "%ld: completed dummy semop()\n", (long) getpid());
    /* Perform a "no-op" semaphore operation - changes sem_otime
       so other processes can see we've initialized the set. */
    } else {                            /* We didn't create the semaphore set */
    sop.sem_num = 0; /* Operate on semaphore 0 */
    sop.sem_op = value;
    sop.sem_flg = 0;
    if (semop(semid, &sop, 1) == -1)
      err_exit(errno, "semop");
    logger.info("%ld: completed dummy semop()\n", (long)getpid());
        if (errno != EEXIST) {          /* Unexpected error from semget() */
            err_exit(errno, "semget 1");
  } else { /* We didn't create the semaphore set */
        } else {                        /* Someone else already created it */
            const int MAX_TRIES = 10;
            int j;
            union semun arg;
            struct semid_ds ds;
    if (errno != EEXIST) { /* Unexpected error from semget() */
      err_exit(errno, "semget 1");
            semid = semget(key, 1, perms);      /* So just get ID */
            if (semid == -1)
                err_exit(errno, "semget 2");
    } else { /* Someone else already created it */
      const int MAX_TRIES = 10;
      int j;
      union semun arg;
      struct semid_ds ds;
            fprintf(stderr, "%ld: got semaphore key\n", (long) getpid());
            /* Wait until another process has called semop() */
      semid = semget(key, 1, perms); /* So just get ID */
      if (semid == -1)
        err_exit(errno, "semget 2");
            arg.buf = &ds;
            for (j = 0; j < MAX_TRIES; j++) {
                fprintf(stderr, "Try %d\n", j);
                if (semctl(semid, 0, IPC_STAT, arg) == -1)
                    err_exit(errno, "semctl 2");
      logger.info("%ld: got semaphore key\n", (long)getpid());
      /* Wait until another process has called semop() */
                if (ds.sem_otime != 0)          /* Semop() performed? */
                    break;                      /* Yes, quit loop */
                sleep(1);                       /* If not, wait and retry */
            }
      arg.buf = &ds;
      for (j = 0; j < MAX_TRIES; j++) {
        logger.info("Try %d\n", j);
        if (semctl(semid, 0, IPC_STAT, arg) == -1)
          err_exit(errno, "semctl 2");
            if (ds.sem_otime == 0)              /* Loop ran to completion! */
                err_exit(errno, "Existing semaphore not initialized");
        }
        if (ds.sem_otime != 0) /* Semop() performed? */
          break;               /* Yes, quit loop */
        sleep(1);              /* If not, wait and retry */
      }
      if (ds.sem_otime == 0) /* Loop ran to completion! */
        err_exit(errno, "Existing semaphore not initialized");
    }
    return semid;
  }
  return semid;
}
/* Reserve semaphore (blocking), return 0 on success, or -1 with 'errno'
   set to EINTR if operation was interrupted by a signal handler */
/* Reserve semaphore - decrement it by 1 */
int SemUtil::dec(int semId)
{
    struct sembuf sops;
int SemUtil::dec(int semId) {
  struct sembuf sops;
    sops.sem_num = 0;
    sops.sem_op = -1;
    sops.sem_flg =  0;
  sops.sem_num = 0;
  sops.sem_op = -1;
  sops.sem_flg = 0;
    while (semop(semId, &sops, 1) == -1)
        if (errno != EINTR ) {
            err_msg(errno, "SemUtil::dec");
            return -1;
        }
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
      err_msg(errno, "SemUtil::dec");
      return -1;
    }
    return 0;
  return 0;
}
int SemUtil::dec_nowait(int semId)
{
    struct sembuf sops;
int SemUtil::dec_nowait(int semId) {
  struct sembuf sops;
    sops.sem_num = 0;
    sops.sem_op = -1;
    sops.sem_flg =  IPC_NOWAIT;
  sops.sem_num = 0;
  sops.sem_op = -1;
  sops.sem_flg = IPC_NOWAIT;
    while (semop(semId, &sops, 1) == -1)
        if (errno != EINTR ) {
            err_msg(errno, "SemUtil::dec_nowait");
            return -1;
        }
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
      err_msg(errno, "SemUtil::dec_nowait");
      return -1;
    }
    return 0;
  return 0;
}
int SemUtil::dec_timeout(int semId, struct timespec * timeout)
{
    struct sembuf sops;
int SemUtil::dec_timeout(int semId, struct timespec *timeout) {
  struct sembuf sops;
    sops.sem_num = 0;
    sops.sem_op = -1;
    sops.sem_flg = 0;
  sops.sem_num = 0;
  sops.sem_op = -1;
  sops.sem_flg = 0;
    while ( semtimedop(semId, &sops, 1, timeout) == -1)
        if (errno != EINTR ) {
            err_msg(errno, "SemUtil::dec_timeout");
            return -1;
        }
  while (semtimedop(semId, &sops, 1, timeout) == -1)
    if (errno != EINTR) {
      err_msg(errno, "SemUtil::dec_timeout");
      return -1;
    }
    return 0;
  return 0;
}
/* Release semaphore - increment it by 1 */
int SemUtil::inc(int semId)
{
    struct sembuf sops;
int SemUtil::inc(int semId) {
  struct sembuf sops;
    sops.sem_num = 0;
    sops.sem_op = 1;
    sops.sem_flg = 0;
  sops.sem_num = 0;
  sops.sem_op = 1;
  sops.sem_flg = 0;
    int rv = semop(semId, &sops, 1);
    if(rv == -1) {
        err_msg(errno, "SemUtil::inc");
    }
    return rv;
  int rv = semop(semId, &sops, 1);
  if (rv == -1) {
    err_msg(errno, "SemUtil::inc");
  }
  return rv;
}
void SemUtil::remove(int semid) {
    union semun dummy;
    if (semctl(semid, 0, IPC_RMID, dummy) == -1)
        err_msg(errno, "SemUtil::remove");
  union semun dummy;
  if (semctl(semid, 0, IPC_RMID, dummy) == -1)
    err_msg(errno, "SemUtil::remove");
}
void SemUtil::set(int semId, int val)
{
    union semun arg;
    arg.val = val;
    if (semctl(semId, 0, SETVAL, arg) == -1)
        err_msg(errno, "SemUtil::set");
void SemUtil::set(int semId, int val) {
  union semun arg;
  arg.val = val;
  if (semctl(semId, 0, SETVAL, arg) == -1)
    err_msg(errno, "SemUtil::set");
}
test/Makefile
@@ -2,38 +2,25 @@
# Makefile for common library.
#
ROOT=..
LDLIBS+=-Wl,-rpath=$(ROOT)/queue:$(ROOT)/lib
LDLIBS+=-Wl,-rpath=$(ROOT)/lib:$(ROOT)/build/lib
# 开源工具包路径
LDDIR += -L$(ROOT)/queue
LDDIR +=  -L$(ROOT)/lib -L$(ROOT)/build/lib
# 开源工具包
LDLIBS += -lshm_queue -lusgcommon -lpthread
INCLUDE += -I$(ROOT)/queue/ -I$(ROOT)/queue/include
INCLUDE += -I$(ROOT)/build/include
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
 
PROGS = communication
PROGS = dgram_socket_test
build: $(PROGS)
# test1: $(LIBCOMMON)
# 如果包A 引用包B, B 要放在 A 后面
test_queue: test.h  $(ROOT)/queue/include/lock_free_queue.h
single_productor: test.h  $(ROOT)/queue/include/lock_free_queue.h
single_consumer: test.h  $(ROOT)/queue/include/lock_free_queue.h
clean:
    rm -f $(TEMPFILES) $(PROGS)
$(LIBQUEUE):
    (cd $(ROOT)/queue && $(MAKE))
test/dgram_socket_test
Binary files differ
test/dgram_socket_test.c
New file
@@ -0,0 +1,74 @@
#include "shm_socket.h"
#include "usg_common.h"
#include "shm_mm.h"
#include "shm_socket.h"
#include "usg_common.h"
#include "shm_mm.h"
typedef struct Targ {
    int port;
    int id;
}Targ;
void server(int port) {
    pthread_t tid;
    shm_socket_t *socket = shm_open_socket(SHM_SOCKET_DGRAM);
    shm_socket_bind(socket, port);
    char *buf;
    int size;
    int remotePort;
    char sendbuf[512];
    while( shm_recvfrom(socket, (void **)&buf, &size, &remotePort) == 0) {
        sprintf(sendbuf, "RECEIVED:%s", buf);
        printf("received from %d:%s\n", remotePort, buf);
        shm_sendto(socket, (void *)sendbuf, strlen(sendbuf) + 1, remotePort);
        free(buf);
    }
    shm_close_socket(socket);
}
void client(int port) {
    shm_socket_t *socket = shm_open_socket(SHM_SOCKET_DGRAM);
    int size;
    char *recvbuf;
    char sendbuf[512];
    int remote_port;
    while(true) {
        printf("request: ");
        scanf("%s", sendbuf);
        shm_sendto(socket, sendbuf, strlen(sendbuf)+1, port) ;
        shm_recvfrom(socket, (void **)&recvbuf, &size, &remote_port);
        printf("reply from (%d): %s\n",  remote_port, recvbuf);
        free(recvbuf);
    }
    shm_close_socket(socket);
}
int main(int argc, char *argv[]) {
  shm_init(512);
  int port;
  if (argc < 3) {
       fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
       return 1;
  }
  port = atoi(argv[2]);
  if (strcmp("server", argv[1]) == 0 ) {
     server(port);
  }
  if (strcmp("client", argv[1]) == 0)
     client(port);
  shm_destroy();
 // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
  return 0;
}