wangzhengquan
2020-07-16 3feff4ae44fd74c32158ed5f505e063b154c4d76
udpate
8个文件已修改
201 ■■■■■ 已修改文件
Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/Makefile 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/array_lock_free_queue.h 51 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/lock_free_queue.h 91 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/shm_allocator.h 36 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/shm_queue.h 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
test/Makefile 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Makefile
@@ -1,4 +1,4 @@
DIRS = queue  test2
DIRS = queue  test
all:
    for i in $(DIRS); do \
queue/Makefile
@@ -2,9 +2,9 @@
# Makefile for common library.
#
ROOT=..
#LDLIBS+=-Wl,-rpath=$(ROOT)/common:$(ROOT)/lib/jsoncpp
LDLIBS+=-Wl,-rpath=$(ROOT)/lib
# 开源工具包路径
#LDDIR += -L$(ROOT)/lib/jsoncpp -L$(ROOT)/lib/nng
LDDIR += -L$(ROOT)/lib
# 开源工具包
LDLIBS += -lusgcommon
@@ -41,7 +41,7 @@
#dynamic lib
$(DLIBSQUEUE): $(SOURCES)
    rm -f *.o
    $(CC) -fPIC -shared $(CFLAGS) $^ -o $@ $(LDFLAGS)
    $(CC) -fPIC -shared $(CFLAGS) $(LDFLAGS) $^ -o $@
    #$(CC) -fPIC -shared $(CFLAGS) $(LDFLAGS)  -o $@ $^ -Wl,--whole-archive $(ROOT)/lib/libusgcommon.a -Wl,--no-whole-archive 
queue/include/array_lock_free_queue.h
@@ -5,6 +5,7 @@
#include <sched.h>  // sched_yield()
#include "logger_factory.h"
#include "mem_pool.h"
#include "shm_allocator.h"
/// @brief implementation of an array based lock free queue with support for 
///        multiple producers
@@ -16,13 +17,15 @@
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator = SHM_Allocator>
class ArrayLockFreeQueue
{
    // ArrayLockFreeQueue will be using this' private members
    template <
        typename ELEM_T_, 
        template <typename T> class Q_TYPE >
        typename Allocator_,
        template <typename T, typename AT> class Q_TYPE
        >
    friend class LockFreeQueue;
private:
@@ -76,13 +79,13 @@
    
private:
    /// @brief disable copy constructor declaring it private
    ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src);
    ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
};
template <typename ELEM_T>
ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize):
template <typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
    Q_SIZE(qsize),
    m_writeIndex(0),      // initialisation is not atomic
    m_readIndex(0),       //
@@ -91,30 +94,30 @@
    ,m_count(0)           //
#endif
{
    m_theQueue = (ELEM_T*)mem_pool_malloc(Q_SIZE * sizeof(ELEM_T));
    m_theQueue = (ELEM_T*)Allocator::malloc(Q_SIZE * sizeof(ELEM_T));
}
template <typename ELEM_T>
ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue()
template <typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
{
    // std::cout << "destroy ArrayLockFreeQueue\n";
    mem_pool_free(m_theQueue);
    Allocator::free(m_theQueue);
    
}
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator>
inline 
uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count)
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
{
    // if Q_SIZE is a power of 2 this statement could be also written as 
    // return (a_count & (Q_SIZE - 1));
    return (a_count % Q_SIZE);
}
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator>
inline 
uint32_t ArrayLockFreeQueue<ELEM_T>::size()
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -147,9 +150,9 @@
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator>
inline 
bool ArrayLockFreeQueue<ELEM_T>::full()
bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -172,9 +175,9 @@
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator>
inline 
bool ArrayLockFreeQueue<ELEM_T>::empty()
bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -199,8 +202,8 @@
template <typename ELEM_T>
bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
{
    uint32_t currentReadIndex;
    uint32_t currentWriteIndex;
@@ -248,8 +251,8 @@
}
template <typename ELEM_T>
bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
{
    uint32_t currentMaximumReadIndex;
    uint32_t currentReadIndex;
@@ -303,14 +306,14 @@
    return false;
}
template <typename ELEM_T>
ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i)
template <typename ELEM_T, typename Allocator>
ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
    int currentCount = m_count;
    uint32_t currentReadIndex = m_readIndex;
    if (i < 0 || i >= currentCount)
    {
        std::cerr << "ArrayLockFreeQueue<ELEM_T>::operator[] , Error in array limits: " << i << " is out of range\n";
        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
        std::exit(EXIT_FAILURE);
    }
    return m_theQueue[countToIndex(currentReadIndex+i)];
queue/include/lock_free_queue.h
@@ -6,6 +6,7 @@
#include "mem_pool.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "shm_allocator.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
@@ -18,11 +19,11 @@
// forward declarations for default template values
//
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator>
class ArrayLockFreeQueue;
template <typename ELEM_T>
class LinkedLockFreeQueue;
// template <typename ELEM_T>
// class LinkedLockFreeQueue;
/// @brief Lock-free queue based on a circular array
@@ -66,7 +67,9 @@
///        by default)
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE = ArrayLockFreeQueue >
    typename Allocator = SHM_Allocator,
    template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
     >
class LockFreeQueue
{
@@ -133,18 +136,19 @@
protected:
    /// @brief the actual queue. methods are forwarded into the real 
    ///        implementation
    Q_TYPE<ELEM_T> m_qImpl;
    Q_TYPE<ELEM_T, Allocator> m_qImpl;
private:
    /// @brief disable copy constructor declaring it private
    LockFreeQueue<ELEM_T, Q_TYPE>(const LockFreeQueue<ELEM_T, Q_TYPE> &a_src);
    LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
};
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
{
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
    slots = SemUtil::get(IPC_PRIVATE, qsize);
@@ -153,8 +157,9 @@
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue()
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
{
    LoggerFactory::getLogger().debug("LockFreeQueue desctroy");
    SemUtil::remove(slots);
@@ -163,24 +168,27 @@
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Q_TYPE>::size()
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
{
    return m_qImpl.size();
}  
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Q_TYPE>::full()
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
{
    return m_qImpl.full();
}
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Q_TYPE>::empty()
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
{
    return m_qImpl.empty();
}  
@@ -188,8 +196,9 @@
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Q_TYPE>::push(const ELEM_T &a_data)
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
    if (SemUtil::dec(slots) == -1) {
        err_exit(errno, "push");
@@ -205,8 +214,9 @@
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Q_TYPE>::push_nowait(const ELEM_T &a_data)
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (SemUtil::dec_nowait(slots) == -1) {
        if (errno == EAGAIN)
@@ -225,8 +235,9 @@
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
{
    if (SemUtil::dec_timeout(slots, timeout) == -1) {
@@ -249,8 +260,9 @@
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data)
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
    if (SemUtil::dec(items) == -1) {
        err_msg(errno, "LockFreeQueue pop");
@@ -267,8 +279,9 @@
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_nowait(ELEM_T &a_data)
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (SemUtil::dec_nowait(items) == -1) {
        if (errno == EAGAIN)
@@ -290,8 +303,9 @@
 
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
{
    if (SemUtil::dec_timeout(items, timeout) == -1) {
        if (errno == EAGAIN)
@@ -312,27 +326,30 @@
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
ELEM_T& LockFreeQueue<ELEM_T, Q_TYPE>::operator[](unsigned i) {
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
    return m_qImpl.operator[](i);
}
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
void * LockFreeQueue<ELEM_T, Q_TYPE>::operator new(size_t size){
        return mem_pool_malloc(size);
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
        return Allocator::malloc(size);
}
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
void LockFreeQueue<ELEM_T, Q_TYPE>::operator delete(void *p) {
    return mem_pool_free(p);
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
    return Allocator::free(p);
}
// include implementation files
#include "linked_lock_free_queue.h"
//#include "linked_lock_free_queue.h"
#include "array_lock_free_queue.h"
#endif // _LOCK_FREE_QUEUE_H__
queue/include/shm_allocator.h
@@ -9,7 +9,7 @@
template<class T> class SHMAllocator
template<class T> class SHM_STL_Allocator
{
public:
  typedef T               value_type;
@@ -21,10 +21,10 @@
  typedef ptrdiff_t       difference_type;
  SHMAllocator() {};
  ~SHMAllocator() {};
  template<class U> SHMAllocator(const SHMAllocator<U>& t) { };
  template<class U> struct rebind { typedef SHMAllocator<U> other; };
  SHM_STL_Allocator() {};
  ~SHM_STL_Allocator() {};
  template<class U> SHM_STL_Allocator(const SHM_STL_Allocator<U>& t) { };
  template<class U> struct rebind { typedef SHM_STL_Allocator<U> other; };
  pointer allocate(size_type n, const void* hint=0) {
//        fprintf(stderr, "allocate n=%u, hint= %p\n",n, hint);
@@ -63,12 +63,36 @@
};
class SHM_Allocator {
  public:
    static void *malloc (size_t size) {
      return mm_malloc(size);
    }
    static void free (void *ptr) {
      return mm_free(ptr);
    }
};
class DM_Allocator {
  public:
    static void *malloc (size_t size) {
      return malloc(size);
    }
    static void free (void *ptr) {
      return free(ptr);
    }
};
// template<class charT, class traits = char _traits<charT>,
// class Allocator = allocator<charT> >  
typedef std::basic_string<char, std::char_traits<char>, SHMAllocator<char> > shmstring;
typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > shmstring;
#endif
queue/include/shm_queue.h
@@ -6,6 +6,7 @@
#include "hashtable.h"
#include "lock_free_queue.h"
#include "logger_factory.h"
#include "shm_allocator.h"
// default Queue size
// #define LOCK_FREE_Q_DEFAULT_SIZE 16
@@ -46,7 +47,7 @@
protected:
    /// @brief the actual queue-> methods are forwarded into the real 
    ///        implementation
    LockFreeQueue<ELEM_T>* queue;
    LockFreeQueue<ELEM_T, SHM_Allocator>* queue;
private:
    /// @brief disable copy constructor declaring it private
@@ -60,7 +61,7 @@
    hashtable_t *hashtable = mm_get_hashtable();
    std::set<int>* keyset = hashtable_keyset(hashtable);
    std::set<int>::iterator keyItr;
     LockFreeQueue<ELEM_T>* mqueue;
     LockFreeQueue<ELEM_T, SHM_Allocator>* mqueue;
    bool found;
    for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
        found = false;
@@ -71,7 +72,7 @@
            }
        }
        if(!found) {
           mqueue = (LockFreeQueue<ELEM_T> *)hashtable_get(hashtable, *keyItr);
           mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
           delete mqueue;
        }
    }
@@ -84,10 +85,10 @@
{
    hashtable_t *hashtable = mm_get_hashtable();
    queue = (LockFreeQueue<ELEM_T> *)hashtable_get(hashtable, key);
    queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
    //LockFreeQueue<int, 10000> q;
    if (queue == NULL || (void *)queue == (void *)1) {
        queue = new LockFreeQueue<ELEM_T>(qsize);
        queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
        hashtable_put(hashtable,  key, (void *)queue);
    }
    queue->reference++;
queue/libshm_queue.a
Binary files differ
test/Makefile
@@ -2,11 +2,11 @@
# Makefile for common library.
#
ROOT=..
#LDLIBS+=-Wl,-rpath=$(ROOT)/common:$(ROOT)/lib/jsoncpp
LDLIBS+=-Wl,-rpath=$(ROOT)/queue:$(ROOT)/lib
# 开源工具包路径
LDDIR += -L$(ROOT)/queue
# 开源工具包
LDLIBS += -lshm_queue -lpthread
LDLIBS += -lshm_queue -lusgcommon -lpthread
INCLUDE += -I$(ROOT)/queue/ -I$(ROOT)/queue/include