| | |
| | | DIRS = queue test2 |
| | | DIRS = queue test |
| | | |
| | | all: |
| | | for i in $(DIRS); do \ |
| | |
| | | # Makefile for common library. |
| | | # |
| | | ROOT=.. |
| | | #LDLIBS+=-Wl,-rpath=$(ROOT)/common:$(ROOT)/lib/jsoncpp |
| | | LDLIBS+=-Wl,-rpath=$(ROOT)/lib |
| | | # 开源工具包路径 |
| | | #LDDIR += -L$(ROOT)/lib/jsoncpp -L$(ROOT)/lib/nng |
| | | LDDIR += -L$(ROOT)/lib |
| | | |
| | | # 开源工具包 |
| | | LDLIBS += -lusgcommon |
| | |
| | | #dynamic lib |
| | | $(DLIBSQUEUE): $(SOURCES) |
| | | rm -f *.o |
| | | $(CC) -fPIC -shared $(CFLAGS) $^ -o $@ $(LDFLAGS) |
| | | $(CC) -fPIC -shared $(CFLAGS) $(LDFLAGS) $^ -o $@ |
| | | #$(CC) -fPIC -shared $(CFLAGS) $(LDFLAGS) -o $@ $^ -Wl,--whole-archive $(ROOT)/lib/libusgcommon.a -Wl,--no-whole-archive |
| | | |
| | | |
| | |
| | | #include <sched.h> // sched_yield() |
| | | #include "logger_factory.h" |
| | | #include "mem_pool.h" |
| | | #include "shm_allocator.h" |
| | | |
| | | /// @brief implementation of an array based lock free queue with support for |
| | | /// multiple producers |
| | |
| | | |
| | | #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | template <typename ELEM_T> |
| | | template <typename ELEM_T, typename Allocator = SHM_Allocator> |
| | | class ArrayLockFreeQueue |
| | | { |
| | | // ArrayLockFreeQueue will be using this' private members |
| | | template < |
| | | typename ELEM_T_, |
| | | template <typename T> class Q_TYPE > |
| | | typename Allocator_, |
| | | template <typename T, typename AT> class Q_TYPE |
| | | > |
| | | friend class LockFreeQueue; |
| | | |
| | | private: |
| | |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | | ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src); |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); |
| | | |
| | | }; |
| | | |
| | | |
| | | template <typename ELEM_T> |
| | | ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize): |
| | | template <typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize): |
| | | Q_SIZE(qsize), |
| | | m_writeIndex(0), // initialisation is not atomic |
| | | m_readIndex(0), // |
| | |
| | | ,m_count(0) // |
| | | #endif |
| | | { |
| | | m_theQueue = (ELEM_T*)mem_pool_malloc(Q_SIZE * sizeof(ELEM_T)); |
| | | m_theQueue = (ELEM_T*)Allocator::malloc(Q_SIZE * sizeof(ELEM_T)); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue() |
| | | template <typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() |
| | | { |
| | | // std::cout << "destroy ArrayLockFreeQueue\n"; |
| | | mem_pool_free(m_theQueue); |
| | | Allocator::free(m_theQueue); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count) |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) |
| | | { |
| | | // if Q_SIZE is a power of 2 this statement could be also written as |
| | | // return (a_count & (Q_SIZE - 1)); |
| | | return (a_count % Q_SIZE); |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T>::size() |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T>::full() |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T>::empty() |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | |
| | | |
| | | |
| | | |
| | | template <typename ELEM_T> |
| | | bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data) |
| | | template <typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) |
| | | { |
| | | uint32_t currentReadIndex; |
| | | uint32_t currentWriteIndex; |
| | |
| | | } |
| | | |
| | | |
| | | template <typename ELEM_T> |
| | | bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data) |
| | | template <typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) |
| | | { |
| | | uint32_t currentMaximumReadIndex; |
| | | uint32_t currentReadIndex; |
| | |
| | | return false; |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i) |
| | | template <typename ELEM_T, typename Allocator> |
| | | ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) |
| | | { |
| | | int currentCount = m_count; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | if (i < 0 || i >= currentCount) |
| | | { |
| | | std::cerr << "ArrayLockFreeQueue<ELEM_T>::operator[] , Error in array limits: " << i << " is out of range\n"; |
| | | std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; |
| | | std::exit(EXIT_FAILURE); |
| | | } |
| | | return m_theQueue[countToIndex(currentReadIndex+i)]; |
| | |
| | | #include "mem_pool.h" |
| | | #include "sem_util.h" |
| | | #include "logger_factory.h" |
| | | #include "shm_allocator.h" |
| | | |
| | | // default Queue size |
| | | #define LOCK_FREE_Q_DEFAULT_SIZE 16 |
| | |
| | | // forward declarations for default template values |
| | | // |
| | | |
| | | template <typename ELEM_T> |
| | | template <typename ELEM_T, typename Allocator> |
| | | class ArrayLockFreeQueue; |
| | | |
| | | template <typename ELEM_T> |
| | | class LinkedLockFreeQueue; |
| | | // template <typename ELEM_T> |
| | | // class LinkedLockFreeQueue; |
| | | |
| | | |
| | | /// @brief Lock-free queue based on a circular array |
| | |
| | | /// by default) |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE = ArrayLockFreeQueue > |
| | | typename Allocator = SHM_Allocator, |
| | | template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue |
| | | > |
| | | class LockFreeQueue |
| | | { |
| | | |
| | |
| | | protected: |
| | | /// @brief the actual queue. methods are forwarded into the real |
| | | /// implementation |
| | | Q_TYPE<ELEM_T> m_qImpl; |
| | | Q_TYPE<ELEM_T, Allocator> m_qImpl; |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | | LockFreeQueue<ELEM_T, Q_TYPE>(const LockFreeQueue<ELEM_T, Q_TYPE> &a_src); |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); |
| | | }; |
| | | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) |
| | | { |
| | | // std::cout << "LockFreeQueue init reference=" << reference << std::endl; |
| | | slots = SemUtil::get(IPC_PRIVATE, qsize); |
| | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue() |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() |
| | | { |
| | | LoggerFactory::getLogger().debug("LockFreeQueue desctroy"); |
| | | SemUtil::remove(slots); |
| | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | inline uint32_t LockFreeQueue<ELEM_T, Q_TYPE>::size() |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() |
| | | { |
| | | return m_qImpl.size(); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | inline bool LockFreeQueue<ELEM_T, Q_TYPE>::full() |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() |
| | | { |
| | | return m_qImpl.full(); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | inline bool LockFreeQueue<ELEM_T, Q_TYPE>::empty() |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() |
| | | { |
| | | return m_qImpl.empty(); |
| | | } |
| | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Q_TYPE>::push(const ELEM_T &a_data) |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) |
| | | { |
| | | if (SemUtil::dec(slots) == -1) { |
| | | err_exit(errno, "push"); |
| | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Q_TYPE>::push_nowait(const ELEM_T &a_data) |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data) |
| | | { |
| | | if (SemUtil::dec_nowait(slots) == -1) { |
| | | if (errno == EAGAIN) |
| | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout) |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout) |
| | | { |
| | | |
| | | if (SemUtil::dec_timeout(slots, timeout) == -1) { |
| | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data) |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) |
| | | { |
| | | if (SemUtil::dec(items) == -1) { |
| | | err_msg(errno, "LockFreeQueue pop"); |
| | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_nowait(ELEM_T &a_data) |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data) |
| | | { |
| | | if (SemUtil::dec_nowait(items) == -1) { |
| | | if (errno == EAGAIN) |
| | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) |
| | | { |
| | | if (SemUtil::dec_timeout(items, timeout) == -1) { |
| | | if (errno == EAGAIN) |
| | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | ELEM_T& LockFreeQueue<ELEM_T, Q_TYPE>::operator[](unsigned i) { |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { |
| | | return m_qImpl.operator[](i); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | void * LockFreeQueue<ELEM_T, Q_TYPE>::operator new(size_t size){ |
| | | return mem_pool_malloc(size); |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){ |
| | | return Allocator::malloc(size); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | void LockFreeQueue<ELEM_T, Q_TYPE>::operator delete(void *p) { |
| | | return mem_pool_free(p); |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) { |
| | | return Allocator::free(p); |
| | | } |
| | | |
| | | // include implementation files |
| | | #include "linked_lock_free_queue.h" |
| | | //#include "linked_lock_free_queue.h" |
| | | #include "array_lock_free_queue.h" |
| | | |
| | | #endif // _LOCK_FREE_QUEUE_H__ |
| | |
| | | |
| | | |
| | | |
| | | template<class T> class SHMAllocator |
| | | template<class T> class SHM_STL_Allocator |
| | | { |
| | | public: |
| | | typedef T value_type; |
| | |
| | | typedef ptrdiff_t difference_type; |
| | | |
| | | |
| | | SHMAllocator() {}; |
| | | ~SHMAllocator() {}; |
| | | template<class U> SHMAllocator(const SHMAllocator<U>& t) { }; |
| | | template<class U> struct rebind { typedef SHMAllocator<U> other; }; |
| | | SHM_STL_Allocator() {}; |
| | | ~SHM_STL_Allocator() {}; |
| | | template<class U> SHM_STL_Allocator(const SHM_STL_Allocator<U>& t) { }; |
| | | template<class U> struct rebind { typedef SHM_STL_Allocator<U> other; }; |
| | | |
| | | pointer allocate(size_type n, const void* hint=0) { |
| | | // fprintf(stderr, "allocate n=%u, hint= %p\n",n, hint); |
| | |
| | | }; |
| | | |
| | | |
| | | class SHM_Allocator { |
| | | public: |
| | | static void *malloc (size_t size) { |
| | | return mm_malloc(size); |
| | | } |
| | | |
| | | static void free (void *ptr) { |
| | | return mm_free(ptr); |
| | | } |
| | | }; |
| | | |
| | | |
| | | class DM_Allocator { |
| | | public: |
| | | static void *malloc (size_t size) { |
| | | return malloc(size); |
| | | } |
| | | |
| | | static void free (void *ptr) { |
| | | return free(ptr); |
| | | } |
| | | }; |
| | | |
| | | |
| | | // template<class charT, class traits = char _traits<charT>, |
| | | // class Allocator = allocator<charT> > |
| | | |
| | | |
| | | |
| | | |
| | | typedef std::basic_string<char, std::char_traits<char>, SHMAllocator<char> > shmstring; |
| | | typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > shmstring; |
| | | |
| | | #endif |
| | |
| | | #include "hashtable.h" |
| | | #include "lock_free_queue.h" |
| | | #include "logger_factory.h" |
| | | #include "shm_allocator.h" |
| | | |
| | | // default Queue size |
| | | // #define LOCK_FREE_Q_DEFAULT_SIZE 16 |
| | |
| | | protected: |
| | | /// @brief the actual queue-> methods are forwarded into the real |
| | | /// implementation |
| | | LockFreeQueue<ELEM_T>* queue; |
| | | LockFreeQueue<ELEM_T, SHM_Allocator>* queue; |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | std::set<int>* keyset = hashtable_keyset(hashtable); |
| | | std::set<int>::iterator keyItr; |
| | | LockFreeQueue<ELEM_T>* mqueue; |
| | | LockFreeQueue<ELEM_T, SHM_Allocator>* mqueue; |
| | | bool found; |
| | | for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { |
| | | found = false; |
| | |
| | | } |
| | | } |
| | | if(!found) { |
| | | mqueue = (LockFreeQueue<ELEM_T> *)hashtable_get(hashtable, *keyItr); |
| | | mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); |
| | | delete mqueue; |
| | | } |
| | | } |
| | |
| | | { |
| | | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | queue = (LockFreeQueue<ELEM_T> *)hashtable_get(hashtable, key); |
| | | queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); |
| | | //LockFreeQueue<int, 10000> q; |
| | | if (queue == NULL || (void *)queue == (void *)1) { |
| | | queue = new LockFreeQueue<ELEM_T>(qsize); |
| | | queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize); |
| | | hashtable_put(hashtable, key, (void *)queue); |
| | | } |
| | | queue->reference++; |
| | |
| | | # Makefile for common library. |
| | | # |
| | | ROOT=.. |
| | | #LDLIBS+=-Wl,-rpath=$(ROOT)/common:$(ROOT)/lib/jsoncpp |
| | | LDLIBS+=-Wl,-rpath=$(ROOT)/queue:$(ROOT)/lib |
| | | # 开源工具包路径 |
| | | LDDIR += -L$(ROOT)/queue |
| | | # 开源工具包 |
| | | LDLIBS += -lshm_queue -lpthread |
| | | LDLIBS += -lshm_queue -lusgcommon -lpthread |
| | | |
| | | INCLUDE += -I$(ROOT)/queue/ -I$(ROOT)/queue/include |
| | | |