From 2c65db46500207f8445aa4baa53bfbb6602e0e18 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 21 一月 2021 16:37:03 +0800 Subject: [PATCH] restructure --- test/futex_demo.cpp | 3 src/time_util.h | 14 src/queue/lock_free_queue.h | 39 - src/socket/shm_socket.h | 1 src/futex_sem.cpp | 8 CMakeLists.txt | 1 src/socket/shm_socket.cpp | 8 src/futex_sem.h | 15 /dev/null | 322 ------------------ src/queue/array_lock_free_queue.h | 4 src/queue/shm_queue.h | 123 ++++-- src/queue/array_lock_free_sem_queue.h | 367 +++++++++++++++++++++ src/psem.cpp | 19 - src/CMakeLists.txt | 44 +- src/time_util.cpp | 26 + 15 files changed, 545 insertions(+), 449 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ef9b64a..8310bfa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,7 @@ option(BUILD_SHARED_LIBS "Build using shared libraries" ON) option(BUILD_DOC "Build doc" OFF) + list(APPEND EXTRA_INCLUDES "${PROJECT_SOURCE_DIR}/include/usgcommon") list(APPEND EXTRA_LIBS ${PROJECT_SOURCE_DIR}/lib/libusgcommon.a pthread rt) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index cfd7f89..8343eb2 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -8,24 +8,27 @@ configure_file(bus_config.h.in bus_config.h) add_library(shm_queue - ./logger_factory.cpp - ./socket/bus_server_socket.cpp - ./socket/bus_server_socket_wrapper.cpp - ./socket/shm_stream_mod_socket.cpp - ./socket/shm_socket.cpp - ./socket/shm_mod_socket.cpp - ./psem.cpp - ./svsem_util.cpp - ./bus_error.cpp - ./net/net_conn_pool.cpp - ./net/net_mod_server_socket_wrapper.cpp - ./net/net_mod_socket_wrapper.cpp - ./net/net_mod_socket.cpp - ./net/net_mod_socket_io.cpp - ./net/net_mod_server_socket.cpp - ./shm/shm_mm_wrapper.cpp - ./shm/mm.cpp - ./shm/hashtable.cpp + ./logger_factory.cpp +./socket/bus_server_socket.cpp +./socket/bus_server_socket_wrapper.cpp +./socket/shm_stream_mod_socket.cpp +./socket/shm_socket.cpp +./socket/shm_mod_socket.cpp +./time_util.cpp +./psem.cpp +./svsem_util.cpp +./bus_error.cpp +./futex_sem.cpp +./net/net_conn_pool.cpp +./net/net_mod_server_socket_wrapper.cpp +./net/net_mod_socket_wrapper.cpp +./net/net_mod_socket.cpp +./net/net_mod_socket_io.cpp +./net/net_mod_server_socket.cpp +./shm/shm_mm_wrapper.cpp +./shm/mm.cpp +./shm/hashtable.cpp + ) @@ -41,7 +44,8 @@ ${CMAKE_CURRENT_SOURCE_DIR}/socket ${CMAKE_CURRENT_SOURCE_DIR}/net ) - + + target_link_libraries(shm_queue PUBLIC ${EXTRA_LIBS} ) # install rules @@ -55,6 +59,8 @@ ./socket/bus_server_socket_wrapper.h ./psem.h ./key_def.h +./time_util.h +./futex_sem.h ./bus_error.h ./svsem_util.h ./logger_factory.h diff --git a/src/futex_sem.cpp b/src/futex_sem.cpp new file mode 100644 index 0000000..226145f --- /dev/null +++ b/src/futex_sem.cpp @@ -0,0 +1,8 @@ +#include "futex_sem.h" + + +int futex(int *uaddr, int futex_op, int val, + const struct timespec *timeout, int *uaddr2, int val3) +{ + return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr, val3); +} \ No newline at end of file diff --git a/src/futex_sem.h b/src/futex_sem.h new file mode 100644 index 0000000..787fca6 --- /dev/null +++ b/src/futex_sem.h @@ -0,0 +1,15 @@ +#ifndef _FUTEXT_SEM_H_ +#define _FUTEXT_SEM_H_ +#include "usg_common.h" +#include <sys/wait.h> +#include <sys/mman.h> +#include <sys/syscall.h> +#include <linux/futex.h> +#include <sys/time.h> +#include <sys/mman.h> +#include <sys/stat.h> /* For mode constants */ +#include <fcntl.h> +int futex(int *uaddr, int futex_op, int val, + const struct timespec *timeout, int *uaddr2, int val3); + +#endif \ No newline at end of file diff --git a/src/psem.cpp b/src/psem.cpp index 8d9333f..d0eb2c2 100644 --- a/src/psem.cpp +++ b/src/psem.cpp @@ -1,26 +1,11 @@ #include "psem.h" #include <semaphore.h> +#include "time_util.h" -#define NANO 1000000000 - - -static struct timespec psem_calc_abs_timeout(const struct timespec *ts) { - - struct timespec res; - struct timespec timeout; - if (clock_gettime(CLOCK_REALTIME, &timeout) == -1) - err_exit(errno, "clock_gettime"); - - res.tv_sec = timeout.tv_sec + ts->tv_sec; - res.tv_nsec = timeout.tv_nsec + ts->tv_nsec; - res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO); - res.tv_nsec = res.tv_nsec % NANO; - return res; -} int psem_timedwait(sem_t *sem, const struct timespec *ts) { - struct timespec abs_timeout = psem_calc_abs_timeout(ts); + struct timespec abs_timeout = TimeUtil::calc_abs_time(ts); int rv ; while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) { diff --git a/src/queue/array_lock_free_queue.h b/src/queue/array_lock_free_queue.h index 233bc6a..ae1506d 100644 --- a/src/queue/array_lock_free_queue.h +++ b/src/queue/array_lock_free_queue.h @@ -1,5 +1,5 @@ -#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ -#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ +#ifndef __ARRAY_LOCK_FREE_QUEUE_H__ +#define __ARRAY_LOCK_FREE_QUEUE_H__ #include "atomic_ops.h" #include <assert.h> // assert() #include <sched.h> // sched_yield() diff --git a/src/queue/array_lock_free_queue2.h b/src/queue/array_lock_free_queue2.h deleted file mode 100644 index 233bc6a..0000000 --- a/src/queue/array_lock_free_queue2.h +++ /dev/null @@ -1,322 +0,0 @@ -#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ -#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ -#include "atomic_ops.h" -#include <assert.h> // assert() -#include <sched.h> // sched_yield() -#include "logger_factory.h" -#include "mem_pool.h" -#include "shm_allocator.h" - -/// @brief implementation of an array based lock free queue with support for -/// multiple producers -/// This class is prevented from being instantiated directly (all members and -/// methods are private). To instantiate a multiple producers lock free queue -/// you must use the ArrayLockFreeQueue fachade: -/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; - - -#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - -template <typename ELEM_T, typename Allocator = SHM_Allocator> -class ArrayLockFreeQueue -{ - // ArrayLockFreeQueue will be using this' private members - template < - typename ELEM_T_, - typename Allocator_, - template <typename T, typename AT> class Q_TYPE - > - friend class LockFreeQueue; - -private: - /// @brief constructor of the class - ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - virtual ~ArrayLockFreeQueue(); - - inline uint32_t size(); - - inline bool full(); - - inline bool empty(); - - bool push(const ELEM_T &a_data); - - bool pop(ELEM_T &a_data); - - /// @brief calculate the index in the circular array that corresponds - /// to a particular "count" value - inline uint32_t countToIndex(uint32_t a_count); - - ELEM_T& operator[](unsigned i); - -private: - size_t Q_SIZE; - /// @brief array to keep the elements - ELEM_T *m_theQueue; - - /// @brief where a new element will be inserted - uint32_t m_writeIndex; - - /// @brief where the next element where be extracted from - uint32_t m_readIndex; - - /// @brief maximum read index for multiple producer queues - /// If it's not the same as m_writeIndex it means - /// there are writes pending to be "committed" to the queue, that means, - /// the place for the data was reserved (the index in the array) but - /// data is still not in the queue, so the thread trying to read will have - /// to wait for those other threads to save the data into the queue - /// - /// note this is only used for multiple producers - uint32_t m_maximumReadIndex; - -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - /// @brief number of elements in the queue - uint32_t m_count; -#endif - - -private: - /// @brief disable copy constructor declaring it private - ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); - -}; - - -template <typename ELEM_T, typename Allocator> -ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize): - Q_SIZE(qsize), - m_writeIndex(0), // initialisation is not atomic - m_readIndex(0), // - m_maximumReadIndex(0) // -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - ,m_count(0) // -#endif -{ - m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); - -} - -template <typename ELEM_T, typename Allocator> -ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() -{ - // std::cout << "destroy ArrayLockFreeQueue\n"; - Allocator::deallocate(m_theQueue); - -} - -template <typename ELEM_T, typename Allocator> -inline -uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) -{ - // if Q_SIZE is a power of 2 this statement could be also written as - // return (a_count & (Q_SIZE - 1)); - return (a_count % Q_SIZE); -} - -template <typename ELEM_T, typename Allocator> -inline -uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return m_count; -#else - - uint32_t currentWriteIndex = m_maximumReadIndex; - uint32_t currentReadIndex = m_readIndex; - - // let's think of a scenario where this function returns bogus data - // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run - // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 - // 2. afterwards this thread is preemted. While this thread is inactive 2 - // elements are inserted and removed from the queue, so m_maximumReadIndex - // is 5 and m_readIndex 4. Real size is still 1 - // 3. Now the current thread comes back from preemption and reads m_readIndex. - // currentReadIndex is 4 - // 4. currentReadIndex is bigger than currentWriteIndex, so - // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, - // it returns that the queue is almost full, when it is almost empty - // - if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) - { - return (currentWriteIndex - currentReadIndex); - } - else - { - return (Q_SIZE + currentWriteIndex - currentReadIndex); - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - -template <typename ELEM_T, typename Allocator> -inline -bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return (m_count == (Q_SIZE)); -#else - - uint32_t currentWriteIndex = m_writeIndex; - uint32_t currentReadIndex = m_readIndex; - - if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) - { - // the queue is full - return true; - } - else - { - // not full! - return false; - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - -template <typename ELEM_T, typename Allocator> -inline -bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return (m_count == 0); -#else - - if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) - { - // the queue is full - return true; - } - else - { - // not full! - return false; - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - - - - - - -template <typename ELEM_T, typename Allocator> -bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) -{ - uint32_t currentReadIndex; - uint32_t currentWriteIndex; - - do - { - - currentWriteIndex = m_writeIndex; - currentReadIndex = m_readIndex; - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count == Q_SIZE) { - return false; - } - #else - if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) - { - // the queue is full - return false; - } - #endif - - } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); - - // We know now that this index is reserved for us. Use it to save the data - m_theQueue[countToIndex(currentWriteIndex)] = a_data; - - // update the maximum read index after saving the data. It wouldn't fail if there is only one thread - // inserting in the queue. It might fail if there are more than 1 producer threads because this - // operation has to be done in the same order as the previous CAS - - while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) - { - // this is a good place to yield the thread in case there are more - // software threads than hardware processors and you have more - // than 1 producer thread - // have a look at sched_yield (POSIX.1b) - sched_yield(); - } - -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - AtomicAdd(&m_count, 1); -#endif - return true; -} - - -template <typename ELEM_T, typename Allocator> -bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) -{ - uint32_t currentMaximumReadIndex; - uint32_t currentReadIndex; - - do - { - // to ensure thread-safety when there is more than 1 producer thread - // a second index is defined (m_maximumReadIndex) - currentReadIndex = m_readIndex; - currentMaximumReadIndex = m_maximumReadIndex; - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count == 0) { - return false; - } - #else - if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) - { - // the queue is empty or - // a producer thread has allocate space in the queue but is - // waiting to commit the data into it - return false; - } - #endif - - // retrieve the data from the queue - a_data = m_theQueue[countToIndex(currentReadIndex)]; - - // try to perfrom now the CAS operation on the read index. If we succeed - // a_data already contains what m_readIndex pointed to before we - // increased it - if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) - { - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - // m_count.fetch_sub(1); - AtomicSub(&m_count, 1); - #endif - return true; - } - - // it failed retrieving the element off the queue. Someone else must - // have read the element stored at countToIndex(currentReadIndex) - // before we could perform the CAS operation - - } while(1); // keep looping to try again! - - // Something went wrong. it shouldn't be possible to reach here - assert(0); - - // Add this return statement to avoid compiler warnings - return false; -} - -template <typename ELEM_T, typename Allocator> -ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) -{ - int currentCount = m_count; - uint32_t currentReadIndex = m_readIndex; - if (i >= currentCount) - { - std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); - } - return m_theQueue[countToIndex(currentReadIndex+i)]; -} - -#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h new file mode 100644 index 0000000..bb213e8 --- /dev/null +++ b/src/queue/array_lock_free_sem_queue.h @@ -0,0 +1,367 @@ +#ifndef __ARRAY_LOCK_FREE_SEM_QUEUE_H__ +#define __ARRAY_LOCK_FREE_SEM_QUEUE_H__ +#include "atomic_ops.h" +#include <assert.h> // assert() +#include <sched.h> // sched_yield() +#include "logger_factory.h" +#include "mem_pool.h" +#include "shm_allocator.h" +#include "futex_sem.h" +#include "time_util.h" + + +/// @brief implementation of an array based lock free queue with support for +/// multiple producers +/// This class is prevented from being instantiated directly (all members and +/// methods are private). To instantiate a multiple producers lock free queue +/// you must use the ArrayLockFreeSemQueue fachade: +/// ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q; + +#define LOCK_FREE_QUEUE_TIMEOUT 1 +#define LOCK_FREE_QUEUE_NOWAIT 1 << 1 + +#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + +template <typename ELEM_T, typename Allocator = SHM_Allocator> +class ArrayLockFreeSemQueue +{ +public: + /// @brief constructor of the class + ArrayLockFreeSemQueue(size_t qsize = 16); + + virtual ~ArrayLockFreeSemQueue(); + + inline uint32_t size(); + + inline bool full(); + + inline bool empty(); + + int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); + + int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); + + /// @brief calculate the index in the circular array that corresponds + /// to a particular "count" value + inline uint32_t countToIndex(uint32_t a_count); + + ELEM_T& operator[](unsigned i); + +public: + void *operator new(size_t size); + void operator delete(void *p); + +private: + size_t Q_SIZE; + /// @brief array to keep the elements + ELEM_T *m_theQueue; + + /// @brief where a new element will be inserted + uint32_t m_writeIndex; + + /// @brief where the next element where be extracted from + uint32_t m_readIndex; + + /// @brief maximum read index for multiple producer queues + /// If it's not the same as m_writeIndex it means + /// there are writes pending to be "committed" to the queue, that means, + /// the place for the data was reserved (the index in the array) but + /// data is still not in the queue, so the thread trying to read will have + /// to wait for those other threads to save the data into the queue + /// + /// note this is only used for multiple producers + uint32_t m_maximumReadIndex; + +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + /// @brief number of elements in the queue + int m_count; +#endif + + + private: + /// @brief disable copy constructor declaring it private + ArrayLockFreeSemQueue<ELEM_T, Allocator>(const ArrayLockFreeSemQueue<ELEM_T> &a_src); + +}; + + +template <typename ELEM_T, typename Allocator> +ArrayLockFreeSemQueue<ELEM_T, Allocator>::ArrayLockFreeSemQueue(size_t qsize): + Q_SIZE(qsize), + m_writeIndex(0), // initialisation is not atomic + m_readIndex(0), // + m_maximumReadIndex(0) // +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + ,m_count(0) // +#endif +{ + m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); + +} + + template <typename ELEM_T, typename Allocator> +ArrayLockFreeSemQueue<ELEM_T, Allocator>::~ArrayLockFreeSemQueue() +{ + // std::cout << "destroy ArrayLockFreeSemQueue\n"; + Allocator::deallocate(m_theQueue); + +} + +template <typename ELEM_T, typename Allocator> + inline +uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) +{ + // if Q_SIZE is a power of 2 this statement could be also written as + // return (a_count & (Q_SIZE - 1)); + return (a_count % Q_SIZE); +} + +template <typename ELEM_T, typename Allocator> + inline +uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::size() +{ +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + return m_count; +#else + + uint32_t currentWriteIndex = m_maximumReadIndex; + uint32_t currentReadIndex = m_readIndex; + + // let's think of a scenario where this function returns bogus data + // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run + // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 + // 2. afterwards this thread is preemted. While this thread is inactive 2 + // elements are inserted and removed from the queue, so m_maximumReadIndex + // is 5 and m_readIndex 4. Real size is still 1 + // 3. Now the current thread comes back from preemption and reads m_readIndex. + // currentReadIndex is 4 + // 4. currentReadIndex is bigger than currentWriteIndex, so + // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, + // it returns that the queue is almost full, when it is almost empty + // + if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) + { + return (currentWriteIndex - currentReadIndex); + } + else + { + return (Q_SIZE + currentWriteIndex - currentReadIndex); + } +#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE +} + +template <typename ELEM_T, typename Allocator> + inline +bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::full() +{ +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + return (m_count == (Q_SIZE)); +#else + + uint32_t currentWriteIndex = m_writeIndex; + uint32_t currentReadIndex = m_readIndex; + + if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) + { + // the queue is full + return true; + } + else + { + // not full! + return false; + } +#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE +} + +template <typename ELEM_T, typename Allocator> + inline +bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::empty() +{ +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + return (m_count == 0); +#else + + if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) + { + // the queue is full + return true; + } + else + { + // not full! + return false; + } +#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE +} + + + + + + + template <typename ELEM_T, typename Allocator> +int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) +{ + uint32_t currentReadIndex; + uint32_t currentWriteIndex; + int s; + + do + { + currentWriteIndex = m_writeIndex; + currentReadIndex = m_readIndex; + + if (m_count == Q_SIZE) { + if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + return -1; + else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + const struct timespec ts = TimeUtil::trim_time(timeout); + s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + return -1; + } + + } else { + s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + return -1; + } + } + + } + + + } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); + + // We know now that this index is reserved for us. Use it to save the data + m_theQueue[countToIndex(currentWriteIndex)] = a_data; + + // update the maximum read index after saving the data. It wouldn't fail if there is only one thread + // inserting in the queue. It might fail if there are more than 1 producer threads because this + // operation has to be done in the same order as the previous CAS + + while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) + { + // this is a good place to yield the thread in case there are more + // software threads than hardware processors and you have more + // than 1 producer thread + // have a look at sched_yield (POSIX.1b) + sched_yield(); + } + + AtomicAdd(&m_count, 1); + s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); + if (s == -1) + err_exit(errno, "futex-FUTEX_WAKE"); + return 0; +} + + + template <typename ELEM_T, typename Allocator> +int ArrayLockFreeSemQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) +{ + uint32_t currentMaximumReadIndex; + uint32_t currentReadIndex; + + int s; + do + { + // to ensure thread-safety when there is more than 1 producer thread + // a second index is defined (m_maximumReadIndex) + currentReadIndex = m_readIndex; + currentMaximumReadIndex = m_maximumReadIndex; +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + if (m_count == 0) { + + if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + return -1; + else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + const struct timespec ts = TimeUtil::trim_time(timeout); + s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + return -1; + } + + } else { + s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + return -1; + } + } + } +#else + if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) + { + // the queue is empty or + // a producer thread has allocate space in the queue but is + // waiting to commit the data into it + return -1; + } +#endif + + // retrieve the data from the queue + a_data = m_theQueue[countToIndex(currentReadIndex)]; + + // try to perfrom now the CAS operation on the read index. If we succeed + // a_data already contains what m_readIndex pointed to before we + // increased it + if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) + { +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + // m_count.fetch_sub(1); + AtomicSub(&m_count, 1); +#endif + + s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); + if (s == -1) + err_exit(errno, "futex-FUTEX_WAKE"); + return 0; + } + + // it failed retrieving the element off the queue. Someone else must + // have read the element stored at countToIndex(currentReadIndex) + // before we could perform the CAS operation + + } while(1); // keep looping to try again! + + // Something went wrong. it shouldn't be possible to reach here + assert(0); + + // Add this return statement to avoid compiler warnings + return -1; +} + + template <typename ELEM_T, typename Allocator> +ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i) +{ + int currentCount = m_count; + uint32_t currentReadIndex = m_readIndex; + if (i >= currentCount) + { + std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; + std::exit(EXIT_FAILURE); + } + return m_theQueue[countToIndex(currentReadIndex+i)]; +} + + + +template <typename ELEM_T, typename Allocator> +void * ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator new(size_t size){ + return Allocator::allocate(size); +} + +template <typename ELEM_T, typename Allocator> +void ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator delete(void *p) { + return Allocator::deallocate(p); +} + +#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h index b7dfd9f..01e597c 100644 --- a/src/queue/lock_free_queue.h +++ b/src/queue/lock_free_queue.h @@ -221,8 +221,7 @@ { LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); if (psem_wait(&slots) == -1) { - err_msg(errno, "LockFreeQueue push"); - return errno; + return -1; } if ( m_qImpl.push(a_data) ) { @@ -241,13 +240,7 @@ int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data) { if (psem_trywait(&slots) == -1) { - if (errno == EAGAIN) - return EAGAIN; - else { - err_msg(errno, "LockFreeQueue push_nowait"); - return errno; - } - + return -1; } if ( m_qImpl.push(a_data)) { @@ -265,15 +258,8 @@ int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts) { LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n"); - int rv; if ( psem_timedwait(&slots, ts) == -1) { - - if(errno == ETIMEDOUT) - return EBUS_TIMEOUT; - else { - LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); - return errno; - } + return -1; } if (m_qImpl.push(a_data)){ @@ -297,8 +283,7 @@ LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); if (psem_wait(&items) == -1) { - LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop"); - return errno; + return -1; } if (m_qImpl.pop(a_data)) { @@ -316,12 +301,7 @@ int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data) { if (psem_trywait(&items) == -1) { - if (errno == EAGAIN) - return errno; - else { - LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait"); - return errno; - } + return -1; } if (m_qImpl.pop(a_data)) { @@ -339,14 +319,7 @@ int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts) { if (psem_timedwait(&items, ts) == -1) { - if (errno == ETIMEDOUT) { - return EBUS_TIMEOUT; - } - - else { - LoggerFactory::getLogger()->error(errno, "3 LockFreeQueue pop_timeout %d", errno); - return errno; - } + return -1; } if (m_qImpl.pop(a_data)) { diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h index 7d98eaa..5d2d9b6 100644 --- a/src/queue/shm_queue.h +++ b/src/queue/shm_queue.h @@ -6,12 +6,13 @@ #define __SHM_QUEUE_H__ #include "hashtable.h" -#include "lock_free_queue.h" + #include "logger_factory.h" #include "sem_util.h" #include "shm_allocator.h" #include "usg_common.h" - +#include "array_lock_free_sem_queue.h" +#include "bus_error.h" template <typename ELEM_T> class SHMQueue { @@ -20,7 +21,7 @@ public: /// @brief constructor of the class - SHMQueue(int key = 0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); + SHMQueue(int key = 0, size_t qsize = 16); ~SHMQueue(); @@ -49,7 +50,8 @@ protected: /// @brief the actual queue-> methods are forwarded into the real /// implementation - LockFreeQueue<ELEM_T, SHM_Allocator> *queue; + + ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *queue; private: /// @brief disable copy constructor declaring it private @@ -62,7 +64,7 @@ hashtable_t *hashtable = mm_get_hashtable(); std::set<int> *keyset = hashtable_keyset(hashtable); std::set<int>::iterator keyItr; - LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; + ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue; bool found; size_t count = 0; for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { @@ -75,7 +77,7 @@ } if (!found) { // 閿�姣佸叡浜唴瀛樼殑queue - mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); + mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); delete mqueue; hashtable_remove(hashtable, *keyItr); count++; @@ -89,11 +91,11 @@ template <typename ELEM_T> size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) { hashtable_t *hashtable = mm_get_hashtable(); - LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; + ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue; size_t count = 0; for(int i = 0; i< length; i++) { // 閿�姣佸叡浜唴瀛樼殑queue - mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]); + mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]); delete mqueue; hashtable_remove(hashtable, keys[i]); count++; @@ -111,49 +113,22 @@ SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) { hashtable_t *hashtable = mm_get_hashtable(); - queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); + queue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); if (queue == NULL || (void *)queue == (void *)1) { - queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize); + queue = new ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator>(qsize); hashtable_put(hashtable, key, (void *)queue); } - queue->reference++; + // queue->reference++; // LoggerFactory::getLogger()->debug("SHMQueue constructor reference===%d", queue->reference.load()); } template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() { - if(queue == NULL) { - // queue宸茬粡閿�姣� - return; - } - - sem_wait(&(queue->mutex)); - queue->reference--; - // LoggerFactory::getLogger()->debug("SHMQueue destructor reference===%d", - if (queue->reference.load() == 0) { - delete queue; - queue = NULL; - hashtable_t *hashtable = mm_get_hashtable(); - hashtable_remove(hashtable, KEY); - // 姝ゆ椂queue宸茬粡閿�姣侊紝鏃犻渶 sem_post(&(queue->mutex)) - // printf("SHMQueue destructor delete queue\n"); - } else { - sem_post(&(queue->mutex)); - } - -} - -template <typename ELEM_T> void SHMQueue<ELEM_T>::force_destroy() { - if(queue == NULL) { - // queue宸茬粡閿�姣� - return; - } - - SemUtil::dec(queue->mutex); + LoggerFactory::getLogger()->debug("SHMQueue destroy"); delete queue; queue = NULL; hashtable_t *hashtable = mm_get_hashtable(); hashtable_remove(hashtable, KEY); - // 姝ゆ椂queue宸茬粡閿�姣侊紝鏃犻渶 SemUtil::inc(queue->mutex) + } template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::size() { @@ -170,36 +145,85 @@ template <typename ELEM_T> inline int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) { - return queue->push(a_data); + int rv = queue->push(a_data); + if(rv == -1) { + return errno; + } else { + return 0; + } } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) { - return queue->push_nowait(a_data); + int rv = queue->push(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT); + if(rv == -1) { + if (errno == EAGAIN) + return EAGAIN; + else { + err_msg(errno, "LockFreeQueue push_nowait"); + return errno; + } + } + return 0; } template <typename ELEM_T> -inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, - const struct timespec *timeout) { +inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) { - return queue->push_timeout(a_data, timeout); + int rv = queue->push(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT); + if(rv == -1) { + if(errno == ETIMEDOUT) + return EBUS_TIMEOUT; + else { + LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); + return errno; + } + } + return 0; } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) { // printf("SHMQueue pop before\n"); int rv = queue->pop(a_data); // printf("SHMQueue after before\n"); - return rv; + if(rv == -1) { + return errno; + } else { + return 0; + } } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) { - return queue->pop_nowait(a_data); + int rv = queue->pop(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT); + + if(rv == -1) { + if (errno == EAGAIN) + return errno; + else { + LoggerFactory::getLogger()->error(errno, " SHMQueue pop_nowait"); + return errno; + } + } + return 0; + } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) { - return queue->pop_timeout(a_data, timeout); + + int rv; + rv = queue->pop(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT); + if(rv == -1) { + if (errno == ETIMEDOUT) { + return EBUS_TIMEOUT; + } else { + LoggerFactory::getLogger()->error(errno, " SHMQueue pop_timeout"); + return errno; + } + } + return 0; + } template <typename ELEM_T> @@ -207,4 +231,7 @@ return queue->operator[](i); } + + + #endif diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 534202d..76e906f 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -383,10 +383,8 @@ if (rv == 0) { // printf("shm_sendto push after\n"); - delete remoteQueue; return 0; } else { - delete remoteQueue; mm_free(dest.buf); if(rv > EBUS_BASE) { // bus_errno = EBUS_TIMEOUT; @@ -725,10 +723,7 @@ socket->queue = NULL; } - if (socket->remoteQueue != NULL) { - delete socket->remoteQueue; - socket->remoteQueue = NULL; - } + if (socket->messageQueue != NULL) { delete socket->messageQueue; @@ -747,7 +742,6 @@ client_socket = iter->second; client_socket->remoteQueue->push_timeout(close_msg, &timeout); - delete client_socket->remoteQueue; client_socket->remoteQueue = NULL; delete client_socket->messageQueue; diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h index 0917d00..abc8e20 100644 --- a/src/socket/shm_socket.h +++ b/src/socket/shm_socket.h @@ -4,6 +4,7 @@ #include "usg_common.h" #include "usg_typedef.h" #include "shm_queue.h" +#include "lock_free_queue.h" enum shm_socket_flag_t { diff --git a/src/time_util.cpp b/src/time_util.cpp new file mode 100644 index 0000000..f00b4f4 --- /dev/null +++ b/src/time_util.cpp @@ -0,0 +1,26 @@ +#include "time_util.h" + +#define NANO 1000000000 + + +struct timespec TimeUtil::calc_abs_time(const struct timespec *ts) { + + struct timespec res; + struct timespec timeout; + if (clock_gettime(CLOCK_REALTIME, &timeout) == -1) + err_exit(errno, "clock_gettime"); + + res.tv_sec = timeout.tv_sec + ts->tv_sec; + res.tv_nsec = timeout.tv_nsec + ts->tv_nsec; + res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO); + res.tv_nsec = res.tv_nsec % NANO; + return res; +} + +struct timespec TimeUtil::trim_time(const struct timespec *ts) { + + struct timespec res; + res.tv_sec = ts->tv_sec + floor(ts->tv_nsec / NANO); + res.tv_nsec = ts->tv_nsec % NANO; + return res; +} \ No newline at end of file diff --git a/src/time_util.h b/src/time_util.h new file mode 100644 index 0000000..1f23d39 --- /dev/null +++ b/src/time_util.h @@ -0,0 +1,14 @@ +#ifndef _TIMEUTIL_H_ +#define _TIMEUTIL_H_ +#include "usg_common.h" +class TimeUtil { +public: + // 璁$畻褰撳墠鏃堕棿+ts鐨勭粷瀵规椂闂� + static struct timespec calc_abs_time(const struct timespec *ts); + + // 濡傛灉绾崇澶т簬10e9锛屽悜绉掕繘浣� + static struct timespec trim_time(const struct timespec *ts) ; +}; + + +#endif \ No newline at end of file diff --git a/test/futex_demo.cpp b/test/futex_demo.cpp index d6887e8..2899862 100644 --- a/test/futex_demo.cpp +++ b/test/futex_demo.cpp @@ -19,11 +19,12 @@ #include <sys/syscall.h> #include <linux/futex.h> #include <sys/time.h> -#include "usg_common.h" #include <sys/mman.h> #include <sys/stat.h> /* For mode constants */ #include <fcntl.h> /* For O_* constants */ +#include "usg_common.h" + #define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \ } while (0) -- Gitblit v1.8.0