From 4c73fd7179e92bee9cccb65e46823b00f568acb3 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 22 一月 2021 16:57:34 +0800 Subject: [PATCH] tmp --- src/queue/lock_free_queue.h | 404 +++++++---------- test_net_socket/test_net_mod_socket.cpp | 5 src/socket/shm_socket.h | 8 build.sh | 4 CMakeLists.txt | 6 src/net/net_mod_server_socket.cpp | 2 src/net/net_mod_socket.cpp | 8 src/socket/shm_socket.cpp | 16 src/bus_def.h | 7 test_socket/CMakeLists.txt | 11 /dev/null | 131 ----- src/queue/array_lock_free_queue.h | 487 ++++++++++----------- src/queue/shm_queue.h | 23 test_socket/bus_test.cpp | 116 +++++ src/net/net_mod_socket_wrapper.cpp | 7 src/queue/array_lock_free_sem_queue.h | 41 + test_net_socket/net_mod_socket.sh | 4 src/CMakeLists.txt | 11 src/socket/shm_mod_socket.cpp | 14 19 files changed, 625 insertions(+), 680 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8310bfa..3b1e4b4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,7 +13,10 @@ set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}") # set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}") -option(BUILD_SHARED_LIBS "Build using shared libraries" ON) +if (CMAKE_BUILD_TYPE EQUAL "Release") +#option(BUILD_SHARED_LIBS "Build using shared libraries" ON) +endif() + option(BUILD_DOC "Build doc" OFF) @@ -29,4 +32,5 @@ add_subdirectory(${PROJECT_SOURCE_DIR}/src) add_subdirectory(${PROJECT_SOURCE_DIR}/test) add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket) + add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket) endif() diff --git a/build.sh b/build.sh index eab77c7..6bfd753 100755 --- a/build.sh +++ b/build.sh @@ -2,6 +2,7 @@ BUILD_TYPE="Debug" BUILD_DOC="OFF" +BUILD_SHARED_LIBS="OFF" function usage() { echo "build.sh [release | debug | doc]" @@ -10,6 +11,7 @@ case ${1} in "release") BUILD_TYPE="Release" + BUILD_SHARED_LIBS="ON" ;; "debug") @@ -48,7 +50,7 @@ # -DBUILD_SHARED_LIBS=ON # -DCMAKE_INSTALL_PREFIX=$(pwd/../dest) # -DQCA_MAN_INSTALL_DIR:PATH=/usr/share/man -cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DBUILD_SHARED_LIBS=ON \ +cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DBUILD_SHARED_LIBS=${BUILD_SHARED_LIBS} \ -DBUILD_DOC=${BUILD_DOC} -DSUPPORT_RDMA=OFF .. cmake --build . diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fe597d9..f3698d9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,5 +1,3 @@ - - # should we use our own math functions option(SUPPORT_RDMA "If support rdma" OFF) @@ -16,9 +14,9 @@ ./socket/shm_mod_socket.cpp ./time_util.cpp ./psem.cpp -./svsem.cpp ./bus_error.cpp ./futex_sem.cpp +./svsem.cpp ./net/net_conn_pool.cpp ./net/net_mod_server_socket_wrapper.cpp ./net/net_mod_socket_wrapper.cpp @@ -28,7 +26,6 @@ ./shm/shm_mm_wrapper.cpp ./shm/mm.cpp ./shm/hashtable.cpp - ) @@ -62,13 +59,14 @@ ./time_util.h ./futex_sem.h ./bus_error.h -./svsem.h +./bus_def.h ./logger_factory.h ./queue/linked_lock_free_queue.h -./queue/array_lock_free_queue2.h ./queue/array_lock_free_queue.h ./queue/shm_queue.h +./queue/array_lock_free_sem_queue.h ./queue/lock_free_queue.h +./svsem.h ./net/net_conn_pool.h ./net/net_mod_socket.h ./net/net_mod_server_socket_wrapper.h @@ -82,6 +80,7 @@ ./shm/shm_allocator.h + DESTINATION include) install(FILES "${PROJECT_BINARY_DIR}/src/bus_config.h" diff --git a/src/bus_def.h b/src/bus_def.h new file mode 100644 index 0000000..78a7eb9 --- /dev/null +++ b/src/bus_def.h @@ -0,0 +1,7 @@ +#ifndef _BUS_DEF_H_ +#define _BUS_DEF_H_ + +#define BUS_TIMEOUT_FLAG 1 +#define BUS_NOWAIT_FLAG 1 << 1 + +#endif \ No newline at end of file diff --git a/src/net/net_mod_server_socket.cpp b/src/net/net_mod_server_socket.cpp index 8324d6f..432defb 100644 --- a/src/net/net_mod_server_socket.cpp +++ b/src/net/net_mod_server_socket.cpp @@ -54,7 +54,7 @@ sprintf(portstr, "%d", port); listenfd = open_listenfd(portstr); if(listenfd < 0) { - LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start . errno=%d ", errno); + LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start. port = %d ", port); return -1; } init_pool(listenfd); diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp index 73be131..3ef15b9 100644 --- a/src/net/net_mod_socket.cpp +++ b/src/net/net_mod_socket.cpp @@ -513,10 +513,12 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ int NetModSocket::recvfrom(void **buf, int *size, int *key) { + + logger->debug(" %d NetModSocket::recvfrom before", get_key()); int rv = shmModSocket.recvfrom(buf, size, key); if(rv == 0) { - logger->debug("NetModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); + logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key); return 0; } @@ -533,7 +535,7 @@ struct timespec timeout = {sec, nsec}; int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout); if(rv == 0) { - logger->debug("NetModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); + logger->debug("NetModSocket::recvfrom_timeout: %d recvfrom %d success.\n", get_key(), *key); return 0; } @@ -549,7 +551,7 @@ int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){ int rv = shmModSocket.recvfrom_nowait(buf, size, key); if(rv == 0) { - logger->debug("NetModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); + logger->debug("NetModSocket::recvfrom_nowait: %d recvfrom %d success.\n", get_key(), *key); return 0; } diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp index 83161bc..e7bf302 100644 --- a/src/net/net_mod_socket_wrapper.cpp +++ b/src/net/net_mod_socket_wrapper.cpp @@ -67,8 +67,13 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){ + int rv; NetModSocket *sockt = (NetModSocket *)_socket; - return sockt->recvfrom(buf, size, key); + + logger->debug(" %d net_mod_socket_recvfrom before", net_mod_socket_get_key(_socket)); + rv = sockt->recvfrom(buf, size, key); + logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv); + return rv; } // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){ diff --git a/src/queue/array_lock_free_queue.h b/src/queue/array_lock_free_queue.h index ae1506d..a03b33e 100644 --- a/src/queue/array_lock_free_queue.h +++ b/src/queue/array_lock_free_queue.h @@ -1,5 +1,6 @@ #ifndef __ARRAY_LOCK_FREE_QUEUE_H__ #define __ARRAY_LOCK_FREE_QUEUE_H__ + #include "atomic_ops.h" #include <assert.h> // assert() #include <sched.h> // sched_yield() @@ -17,306 +18,290 @@ #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -template <typename ELEM_T, typename Allocator = SHM_Allocator> -class ArrayLockFreeQueue -{ - // ArrayLockFreeQueue will be using this' private members - template < - typename ELEM_T_, - typename Allocator_, - template <typename T, typename AT> class Q_TYPE - > - friend class LockFreeQueue; +template<typename ELEM_T, typename Allocator = SHM_Allocator> +class ArrayLockFreeQueue { + // ArrayLockFreeQueue will be using this' private members + template< + typename ELEM_T_, + typename Allocator_, + template<typename T, typename AT> class Q_TYPE + > + friend + class LockFreeQueue; private: - /// @brief constructor of the class - ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - virtual ~ArrayLockFreeQueue(); - - inline uint32_t size(); - - inline bool full(); + /// @brief constructor of the class + ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - inline bool empty(); - - bool push(const ELEM_T &a_data); - - bool pop(ELEM_T &a_data); - - /// @brief calculate the index in the circular array that corresponds - /// to a particular "count" value - inline uint32_t countToIndex(uint32_t a_count); + virtual ~ArrayLockFreeQueue(); - ELEM_T& operator[](unsigned i); - -private: - size_t Q_SIZE; - /// @brief array to keep the elements - ELEM_T *m_theQueue; + inline uint32_t size(); - /// @brief where a new element will be inserted - uint32_t m_writeIndex; + inline bool full(); - /// @brief where the next element where be extracted from - uint32_t m_readIndex; - - /// @brief maximum read index for multiple producer queues - /// If it's not the same as m_writeIndex it means - /// there are writes pending to be "committed" to the queue, that means, - /// the place for the data was reserved (the index in the array) but - /// data is still not in the queue, so the thread trying to read will have - /// to wait for those other threads to save the data into the queue - /// - /// note this is only used for multiple producers - uint32_t m_maximumReadIndex; + inline bool empty(); + + bool push(const ELEM_T &a_data); + + bool pop(ELEM_T &a_data); + + /// @brief calculate the index in the circular array that corresponds + /// to a particular "count" value + inline uint32_t countToIndex(uint32_t a_count); + + ELEM_T &operator[](unsigned i); + +private: + size_t Q_SIZE; + /// @brief array to keep the elements + ELEM_T *m_theQueue; + + /// @brief where a new element will be inserted + uint32_t m_writeIndex; + + /// @brief where the next element where be extracted from + uint32_t m_readIndex; + + /// @brief maximum read index for multiple producer queues + /// If it's not the same as m_writeIndex it means + /// there are writes pending to be "committed" to the queue, that means, + /// the place for the data was reserved (the index in the array) but + /// data is still not in the queue, so the thread trying to read will have + /// to wait for those other threads to save the data into the queue + /// + /// note this is only used for multiple producers + uint32_t m_maximumReadIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - /// @brief number of elements in the queue - uint32_t m_count; + /// @brief number of elements in the queue + uint32_t m_count; #endif - - + + private: - /// @brief disable copy constructor declaring it private - ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); + /// @brief disable copy constructor declaring it private + ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); }; -template <typename ELEM_T, typename Allocator> +template<typename ELEM_T, typename Allocator> ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize): - Q_SIZE(qsize), - m_writeIndex(0), // initialisation is not atomic - m_readIndex(0), // - m_maximumReadIndex(0) // + Q_SIZE(qsize), + m_writeIndex(0), // initialisation is not atomic + m_readIndex(0), // + m_maximumReadIndex(0) // #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - ,m_count(0) // + , m_count(0) // #endif { - m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); + m_theQueue = (ELEM_T *) Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); } -template <typename ELEM_T, typename Allocator> -ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() -{ - // std::cout << "destroy ArrayLockFreeQueue\n"; - Allocator::deallocate(m_theQueue); - +template<typename ELEM_T, typename Allocator> +ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() { + // std::cout << "destroy ArrayLockFreeQueue\n"; + Allocator::deallocate(m_theQueue); + } -template <typename ELEM_T, typename Allocator> -inline -uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) -{ - // if Q_SIZE is a power of 2 this statement could be also written as - // return (a_count & (Q_SIZE - 1)); - return (a_count % Q_SIZE); +template<typename ELEM_T, typename Allocator> +inline +uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) { + // if Q_SIZE is a power of 2 this statement could be also written as + // return (a_count & (Q_SIZE - 1)); + return (a_count % Q_SIZE); } -template <typename ELEM_T, typename Allocator> -inline -uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() -{ +template<typename ELEM_T, typename Allocator> +inline +uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - return m_count; + return m_count; #else - uint32_t currentWriteIndex = m_maximumReadIndex; - uint32_t currentReadIndex = m_readIndex; + uint32_t currentWriteIndex = m_maximumReadIndex; + uint32_t currentReadIndex = m_readIndex; - // let's think of a scenario where this function returns bogus data - // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run - // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 - // 2. afterwards this thread is preemted. While this thread is inactive 2 - // elements are inserted and removed from the queue, so m_maximumReadIndex - // is 5 and m_readIndex 4. Real size is still 1 - // 3. Now the current thread comes back from preemption and reads m_readIndex. - // currentReadIndex is 4 - // 4. currentReadIndex is bigger than currentWriteIndex, so - // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, - // it returns that the queue is almost full, when it is almost empty - // - if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) - { - return (currentWriteIndex - currentReadIndex); - } - else - { - return (Q_SIZE + currentWriteIndex - currentReadIndex); - } + // let's think of a scenario where this function returns bogus data + // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run + // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 + // 2. afterwards this thread is preemted. While this thread is inactive 2 + // elements are inserted and removed from the queue, so m_maximumReadIndex + // is 5 and m_readIndex 4. Real size is still 1 + // 3. Now the current thread comes back from preemption and reads m_readIndex. + // currentReadIndex is 4 + // 4. currentReadIndex is bigger than currentWriteIndex, so + // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, + // it returns that the queue is almost full, when it is almost empty + // + if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) + { + return (currentWriteIndex - currentReadIndex); + } + else + { + return (Q_SIZE + currentWriteIndex - currentReadIndex); + } #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE } -template <typename ELEM_T, typename Allocator> -inline -bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() -{ +template<typename ELEM_T, typename Allocator> +inline +bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - return (m_count == (Q_SIZE)); + return (m_count == (Q_SIZE)); #else - uint32_t currentWriteIndex = m_writeIndex; - uint32_t currentReadIndex = m_readIndex; - + uint32_t currentWriteIndex = m_writeIndex; + uint32_t currentReadIndex = m_readIndex; + + if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) + { + // the queue is full + return true; + } + else + { + // not full! + return false; + } +#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE +} + +template<typename ELEM_T, typename Allocator> +inline +bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() { +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + return (m_count == 0); +#else + + if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) + { + // the queue is full + return true; + } + else + { + // not full! + return false; + } +#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE +} + + +template<typename ELEM_T, typename Allocator> +bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) { + uint32_t currentReadIndex; + uint32_t currentWriteIndex; + + do { + + currentWriteIndex = m_writeIndex; + currentReadIndex = m_readIndex; +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + if (m_count == Q_SIZE) { + return false; + } +#else if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) { // the queue is full - return true; - } - else - { - // not full! return false; } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - -template <typename ELEM_T, typename Allocator> -inline -bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return (m_count == 0); -#else - - if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) - { - // the queue is full - return true; - } - else - { - // not full! - return false; - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - - - - - - -template <typename ELEM_T, typename Allocator> -bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) -{ - uint32_t currentReadIndex; - uint32_t currentWriteIndex; - - do - { - - currentWriteIndex = m_writeIndex; - currentReadIndex = m_readIndex; - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count == Q_SIZE) { - return false; - } - #else - if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) - { - // the queue is full - return false; - } - #endif - - } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); - - // We know now that this index is reserved for us. Use it to save the data - m_theQueue[countToIndex(currentWriteIndex)] = a_data; - - // update the maximum read index after saving the data. It wouldn't fail if there is only one thread - // inserting in the queue. It might fail if there are more than 1 producer threads because this - // operation has to be done in the same order as the previous CAS - - while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) - { - // this is a good place to yield the thread in case there are more - // software threads than hardware processors and you have more - // than 1 producer thread - // have a look at sched_yield (POSIX.1b) - sched_yield(); - } - -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - AtomicAdd(&m_count, 1); #endif - return true; + + } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); + + // We know now that this index is reserved for us. Use it to save the data + m_theQueue[countToIndex(currentWriteIndex)] = a_data; + + // update the maximum read index after saving the data. It wouldn't fail if there is only one thread + // inserting in the queue. It might fail if there are more than 1 producer threads because this + // operation has to be done in the same order as the previous CAS + + while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { + // this is a good place to yield the thread in case there are more + // software threads than hardware processors and you have more + // than 1 producer thread + // have a look at sched_yield (POSIX.1b) + sched_yield(); + } + +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + AtomicAdd(&m_count, 1); +#endif + return true; } -template <typename ELEM_T, typename Allocator> -bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) -{ - uint32_t currentMaximumReadIndex; - uint32_t currentReadIndex; +template<typename ELEM_T, typename Allocator> +bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) { + uint32_t currentMaximumReadIndex; + uint32_t currentReadIndex; - do - { - // to ensure thread-safety when there is more than 1 producer thread - // a second index is defined (m_maximumReadIndex) - currentReadIndex = m_readIndex; - currentMaximumReadIndex = m_maximumReadIndex; - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + do { + // to ensure thread-safety when there is more than 1 producer thread + // a second index is defined (m_maximumReadIndex) + currentReadIndex = m_readIndex; + currentMaximumReadIndex = m_maximumReadIndex; +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - if (m_count == 0) { - return false; - } - #else - if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) - { - // the queue is empty or - // a producer thread has allocate space in the queue but is - // waiting to commit the data into it - return false; - } - #endif - - // retrieve the data from the queue - a_data = m_theQueue[countToIndex(currentReadIndex)]; - - // try to perfrom now the CAS operation on the read index. If we succeed - // a_data already contains what m_readIndex pointed to before we - // increased it - if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) - { - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - // m_count.fetch_sub(1); - AtomicSub(&m_count, 1); - #endif - return true; - } - - // it failed retrieving the element off the queue. Someone else must - // have read the element stored at countToIndex(currentReadIndex) - // before we could perform the CAS operation - - } while(1); // keep looping to try again! - - // Something went wrong. it shouldn't be possible to reach here - assert(0); - - // Add this return statement to avoid compiler warnings - return false; -} - -template <typename ELEM_T, typename Allocator> -ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) -{ - int currentCount = m_count; - uint32_t currentReadIndex = m_readIndex; - if (i >= currentCount) - { - std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); + if (m_count == 0) { + return false; } - return m_theQueue[countToIndex(currentReadIndex+i)]; +#else + if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) + { + // the queue is empty or + // a producer thread has allocate space in the queue but is + // waiting to commit the data into it + return false; + } +#endif + + // retrieve the data from the queue + a_data = m_theQueue[countToIndex(currentReadIndex)]; + + // try to perfrom now the CAS operation on the read index. If we succeed + // a_data already contains what m_readIndex pointed to before we + // increased it + if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + // m_count.fetch_sub(1); + AtomicSub(&m_count, 1); +#endif + return true; + } + + // it failed retrieving the element off the queue. Someone else must + // have read the element stored at countToIndex(currentReadIndex) + // before we could perform the CAS operation + + } while (1); // keep looping to try again! + + // Something went wrong. it shouldn't be possible to reach here + assert(0); + + // Add this return statement to avoid compiler warnings + return false; +} + +template<typename ELEM_T, typename Allocator> +ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) { + int currentCount = m_count; + uint32_t currentReadIndex = m_readIndex; + if (i >= currentCount) { + std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i + << " is out of range\n"; + std::exit(EXIT_FAILURE); + } + return m_theQueue[countToIndex(currentReadIndex + i)]; } #endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h index 69630d9..a3b677c 100644 --- a/src/queue/array_lock_free_sem_queue.h +++ b/src/queue/array_lock_free_sem_queue.h @@ -8,7 +8,7 @@ #include "shm_allocator.h" #include "futex_sem.h" #include "time_util.h" - +#include "bus_def.h" /// @brief implementation of an array based lock free queue with support for /// multiple producers @@ -17,8 +17,7 @@ /// you must use the ArrayLockFreeSemQueue fachade: /// ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q; -#define LOCK_FREE_QUEUE_TIMEOUT 1 -#define LOCK_FREE_QUEUE_NOWAIT 1 << 1 + #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE @@ -207,15 +206,17 @@ uint32_t currentWriteIndex; int s; + // sigset_t mask_all, pre; + // sigfillset(&mask_all); do { currentWriteIndex = m_writeIndex; currentReadIndex = m_readIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count == Q_SIZE) { - if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) return -1; - else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { @@ -235,9 +236,9 @@ if (currentReadIndex == currentWriteIndex - Q_SIZE + 1 ) { // the queue is full - if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) return -1; - else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { @@ -260,10 +261,11 @@ // We know now that this index is reserved for us. Use it to save the data m_theQueue[countToIndex(currentWriteIndex)] = a_data; + // sigprocmask(SIG_BLOCK, &mask_all, &pre); + // update the maximum read index after saving the data. It wouldn't fail if there is only one thread // inserting in the queue. It might fail if there are more than 1 producer threads because this // operation has to be done in the same order as the previous CAS - while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { // this is a good place to yield the thread in case there are more @@ -283,6 +285,7 @@ err_exit(errno, "futex-FUTEX_WAKE"); #endif + // sigprocmask(SIG_SETMASK, &pre, NULL); return 0; } @@ -293,6 +296,11 @@ uint32_t currentMaximumReadIndex; uint32_t currentReadIndex; int s; + + // sigset_t mask_all, pre; + // sigfillset(&mask_all); + + // sigprocmask(SIG_BLOCK, &mask_all, &pre); do { @@ -305,19 +313,23 @@ if (m_count == 0) { - if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + // sigprocmask(SIG_SETMASK, &pre, NULL); return -1; - else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + } + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + // sigprocmask(SIG_SETMASK, &pre, NULL); return -1; } } else { s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { + // sigprocmask(SIG_SETMASK, &pre, NULL); return -1; } } @@ -330,19 +342,23 @@ // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it - if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + // sigprocmask(SIG_SETMASK, &pre, NULL); return -1; - else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + } + else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + // sigprocmask(SIG_SETMASK, &pre, NULL); return -1; } } else { s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { + // sigprocmask(SIG_SETMASK, &pre, NULL); return -1; } } @@ -367,6 +383,7 @@ err_exit(errno, "futex-FUTEX_WAKE"); #endif + // sigprocmask(SIG_SETMASK, &pre, NULL); return 0; } diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h index 01e597c..9245d3e 100644 --- a/src/queue/lock_free_queue.h +++ b/src/queue/lock_free_queue.h @@ -12,20 +12,20 @@ #include "shm_allocator.h" #include "psem.h" #include "bus_error.h" - +#include "bus_def.h" // default Queue size #define LOCK_FREE_Q_DEFAULT_SIZE 16 // static Logger *logger = LoggerFactory::getLogger(); -// define this macro if calls to "size" must return the real size of the -// queue. If it is undefined that function will try to take a snapshot of +// define this macro if calls to "size" must return the real size of the +// queue. If it is undefined that function will try to take a snapshot of // the queue, but returned value might be bogus // forward declarations for default template values // -template <typename ELEM_T, typename Allocator> +template<typename ELEM_T, typename Allocator> class ArrayLockFreeQueue; // template <typename ELEM_T> @@ -33,9 +33,9 @@ /// @brief Lock-free queue based on a circular array -/// No allocation of extra memory for the nodes handling is needed, but it has -/// to add extra overhead (extra CAS operation) when inserting to ensure the -/// thread-safety of the queue when the queue type is not +/// No allocation of extra memory for the nodes handling is needed, but it has +/// to add extra overhead (extra CAS operation) when inserting to ensure the +/// thread-safety of the queue when the queue type is not /// ArrayLockFreeQueueSingleProducer. /// /// examples of instantiation: @@ -50,113 +50,109 @@ /// /// ELEM_T represents the type of elementes pushed and popped from the queue /// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1) -/// This number should be a power of 2 to ensure -/// indexes in the circular queue keep stable when the uint32_t +/// This number should be a power of 2 to ensure +/// indexes in the circular queue keep stable when the uint32_t /// variable that holds the current position rolls over from FFFFFFFF /// to 0. For instance -/// 2 -> 0x02 +/// 2 -> 0x02 /// 4 -> 0x04 /// 8 -> 0x08 /// 16 -> 0x10 -/// (...) +/// (...) /// 1024 -> 0x400 /// 2048 -> 0x800 /// /// if queue size is not defined as requested, let's say, for /// instance 100, when current position is FFFFFFFF (4,294,967,295) -/// index in the circular array is 4,294,967,295 % 100 = 95. -/// When that value is incremented it will be set to 0, that is the +/// index in the circular array is 4,294,967,295 % 100 = 95. +/// When that value is incremented it will be set to 0, that is the /// last 4 elements of the queue are not used when the counter rolls /// over to 0 -/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and +/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and /// ArrayLockFreeQueue are supported (single producer /// by default) -template < - typename ELEM_T, - typename Allocator = SHM_Allocator, - template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue - > -class LockFreeQueue -{ +template< + typename ELEM_T, + typename Allocator = SHM_Allocator, + template<typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue +> +class LockFreeQueue { private: - sem_t slots; - sem_t items; + sem_t slots; + sem_t items; - public: - sem_t mutex; - LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - /// @brief destructor of the class. - /// Note it is not virtual since it is not expected to inherit from this - /// template - ~LockFreeQueue(); - std::atomic_uint reference; - /// @brief constructor of the class - + sem_t mutex; - /// @brief returns the current number of items in the queue - /// It tries to take a snapshot of the size of the queue, but in busy environments - /// this function might return bogus values. - /// - /// If a reliable queue size must be kept you might want to have a look at - /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' - /// it enables a reliable size though it hits overall performance of the queue - /// (when the reliable size variable is on it's got an impact of about 20% in time) - inline uint32_t size(); - - /// @brief return true if the queue is full. False otherwise - /// It tries to take a snapshot of the size of the queue, but in busy - /// environments this function might return bogus values. See help in method - /// LockFreeQueue::size - inline bool full(); + LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - inline bool empty(); + /// @brief destructor of the class. + /// Note it is not virtual since it is not expected to inherit from this + /// template + ~LockFreeQueue(); - inline ELEM_T& operator[](unsigned i); - - /// @brief push an element at the tail of the queue - /// @param the element to insert in the queue - /// Note that the element is not a pointer or a reference, so if you are using large data - /// structures to be inserted in the queue you should think of instantiate the template - /// of the queue as a pointer to that large structure - /// @return true if the element was inserted in the queue. False if the queue was full - int push(const ELEM_T &a_data); - int push_nowait(const ELEM_T &a_data); - int push_timeout(const ELEM_T &a_data, const struct timespec * timeout); - - /// @brief pop the element at the head of the queue - /// @param a reference where the element in the head of the queue will be saved to - /// Note that the a_data parameter might contain rubbish if the function returns false - /// @return true if the element was successfully extracted from the queue. False if the queue was empty - int pop(ELEM_T &a_data); - int pop_nowait(ELEM_T &a_data); - int pop_timeout(ELEM_T &a_data, struct timespec * timeout); + std::atomic_uint reference; + /// @brief constructor of the class - void *operator new(size_t size); - void operator delete(void *p); + /// @brief returns the current number of items in the queue + /// It tries to take a snapshot of the size of the queue, but in busy environments + /// this function might return bogus values. + /// + /// If a reliable queue size must be kept you might want to have a look at + /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' + /// it enables a reliable size though it hits overall performance of the queue + /// (when the reliable size variable is on it's got an impact of about 20% in time) + inline uint32_t size(); + + /// @brief return true if the queue is full. False otherwise + /// It tries to take a snapshot of the size of the queue, but in busy + /// environments this function might return bogus values. See help in method + /// LockFreeQueue::size + inline bool full(); + + inline bool empty(); + + inline ELEM_T &operator[](unsigned i); + + /// @brief push an element at the tail of the queue + /// @param the element to insert in the queue + /// Note that the element is not a pointer or a reference, so if you are using large data + /// structures to be inserted in the queue you should think of instantiate the template + /// of the queue as a pointer to that large structure + /// @return true if the element was inserted in the queue. False if the queue was full + int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); + + /// @brief pop the element at the head of the queue + /// @param a reference where the element in the head of the queue will be saved to + /// Note that the a_data parameter might contain rubbish if the function returns false + /// @return true if the element was successfully extracted from the queue. False if the queue was empty + int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); + + + void *operator new(size_t size); + + void operator delete(void *p); protected: - /// @brief the actual queue. methods are forwarded into the real - /// implementation - Q_TYPE<ELEM_T, Allocator> m_qImpl; + /// @brief the actual queue. methods are forwarded into the real + /// implementation + Q_TYPE<ELEM_T, Allocator> m_qImpl; private: - /// @brief disable copy constructor declaring it private - LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); + /// @brief disable copy constructor declaring it private + LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); }; -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) -{ -// std::cout << "LockFreeQueue init reference=" << reference << std::endl; +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) { + // std::cout << "LockFreeQueue init reference=" << reference << std::endl; if (sem_init(&slots, 1, qsize) == -1) err_exit(errno, "LockFreeQueue sem_init"); if (sem_init(&items, 1, 0) == -1) @@ -164,194 +160,130 @@ if (sem_init(&mutex, 1, 1) == -1) err_exit(errno, "LockFreeQueue sem_init"); - + } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() -{ + +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() { // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy"); - if(sem_destroy(&slots) == -1) { + if (sem_destroy(&slots) == -1) { err_exit(errno, "LockFreeQueue sem_destroy"); } - if(sem_destroy(&items) == -1) { + if (sem_destroy(&items) == -1) { err_exit(errno, "LockFreeQueue sem_destroy"); } - if(sem_destroy(&mutex) == -1) { + if (sem_destroy(&mutex) == -1) { err_exit(errno, "LockFreeQueue sem_destroy"); } } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() -{ +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() { return m_qImpl.size(); -} +} -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() -{ +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() { return m_qImpl.full(); } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() -{ +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() { return m_qImpl.empty(); -} - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) -{ - LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); - if (psem_wait(&slots) == -1) { - return -1; - } - - if ( m_qImpl.push(a_data) ) { - psem_post(&items); -LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); - return 0; - } - return -1; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data) -{ - if (psem_trywait(&slots) == -1) { - return -1; - } - - if ( m_qImpl.push(a_data)) { - psem_post(&items); - return 0; - } - return -1; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts) -{ -LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n"); - if ( psem_timedwait(&slots, ts) == -1) { - return -1; - } - - if (m_qImpl.push(a_data)){ - psem_post(&items); -LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n"); - return 0; - } - return -1; - } - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) -{ - - LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); - if (psem_wait(&items) == -1) { - return -1; +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { + LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); + if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + if (psem_trywait(&slots) == -1) { + return -1; + } + } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + if (psem_timedwait(&slots, timeout) == -1) { + return -1; + } + } else { + if (psem_wait(&slots) == -1) { + return -1; + } } + + + if (m_qImpl.push(a_data)) { + psem_post(&items); + LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); + return 0; + } + return -1; + +} + +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) { + + LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); + + + if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { + if (psem_trywait(&items) == -1) { + return -1; + } + } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { + if (psem_timedwait(&items, timeout) == -1) { + return -1; + } + } else { + if (psem_wait(&items) == -1) { + return -1; + } + } + if (m_qImpl.pop(a_data)) { psem_post(&slots); - LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); + LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); return 0; } return -1; } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data) -{ - if (psem_trywait(&items) == -1) { - return -1; - } - - if (m_qImpl.pop(a_data)) { - psem_post(&slots); - return 0; - } - return -1; -} - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts) -{ - if (psem_timedwait(&items, ts) == -1) { - return -1; - } - - if (m_qImpl.pop(a_data)) { - psem_post(&slots); -// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n"); - return 0; - } - return -1; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +ELEM_T &LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { return m_qImpl.operator[](i); } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){ - return Allocator::allocate(size); +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +void *LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size) { + return Allocator::allocate(size); } -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> +template<typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) { return Allocator::deallocate(p); } diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h index 5d2d9b6..0be124b 100644 --- a/src/queue/shm_queue.h +++ b/src/queue/shm_queue.h @@ -12,6 +12,7 @@ #include "shm_allocator.h" #include "usg_common.h" #include "array_lock_free_sem_queue.h" +#include "lock_free_queue.h" #include "bus_error.h" template <typename ELEM_T> class SHMQueue { @@ -51,7 +52,7 @@ /// @brief the actual queue-> methods are forwarded into the real /// implementation - ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *queue; + LockFreeQueue<ELEM_T, SHM_Allocator> *queue; private: /// @brief disable copy constructor declaring it private @@ -64,7 +65,7 @@ hashtable_t *hashtable = mm_get_hashtable(); std::set<int> *keyset = hashtable_keyset(hashtable); std::set<int>::iterator keyItr; - ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue; + LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; bool found; size_t count = 0; for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { @@ -77,7 +78,7 @@ } if (!found) { // 閿�姣佸叡浜唴瀛樼殑queue - mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); + mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); delete mqueue; hashtable_remove(hashtable, *keyItr); count++; @@ -91,11 +92,11 @@ template <typename ELEM_T> size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) { hashtable_t *hashtable = mm_get_hashtable(); - ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue; + LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; size_t count = 0; for(int i = 0; i< length; i++) { // 閿�姣佸叡浜唴瀛樼殑queue - mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]); + mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]); delete mqueue; hashtable_remove(hashtable, keys[i]); count++; @@ -113,9 +114,9 @@ SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) { hashtable_t *hashtable = mm_get_hashtable(); - queue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); + queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); if (queue == NULL || (void *)queue == (void *)1) { - queue = new ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator>(qsize); + queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize); hashtable_put(hashtable, key, (void *)queue); } // queue->reference++; @@ -155,7 +156,7 @@ template <typename ELEM_T> inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) { - int rv = queue->push(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT); + int rv = queue->push(a_data, NULL, BUS_NOWAIT_FLAG); if(rv == -1) { if (errno == EAGAIN) return EAGAIN; @@ -170,7 +171,7 @@ template <typename ELEM_T> inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) { - int rv = queue->push(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT); + int rv = queue->push(a_data, timeout, BUS_TIMEOUT_FLAG); if(rv == -1) { if(errno == ETIMEDOUT) return EBUS_TIMEOUT; @@ -195,7 +196,7 @@ template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) { - int rv = queue->pop(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT); + int rv = queue->pop(a_data, NULL, BUS_NOWAIT_FLAG); if(rv == -1) { if (errno == EAGAIN) @@ -213,7 +214,7 @@ inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) { int rv; - rv = queue->pop(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT); + rv = queue->pop(a_data, timeout, BUS_TIMEOUT_FLAG); if(rv == -1) { if (errno == ETIMEDOUT) { return EBUS_TIMEOUT; diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp index 4340bc4..54effbe 100644 --- a/src/socket/shm_mod_socket.cpp +++ b/src/socket/shm_mod_socket.cpp @@ -51,7 +51,7 @@ } // 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){ - return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT); + return shm_sendto(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG); } @@ -74,7 +74,7 @@ } int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){ - int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT); + int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG); // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!"); return rv; } @@ -92,7 +92,7 @@ return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); } int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ - return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); + return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG); } int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ @@ -103,7 +103,7 @@ return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); } int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ - return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); + return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG); } @@ -123,7 +123,7 @@ return _sub_(topic, size, key, timeout, 0); } int ShmModSocket::sub_nowait(char *topic, int size, int key) { - return _sub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); + return _sub_(topic, size, key, NULL, (int)BUS_NOWAIT_FLAG); } @@ -142,7 +142,7 @@ return _desub_(topic, size, key, timeout, 0); } int ShmModSocket::desub_nowait(char *topic, int size, int key) { - return _desub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); + return _desub_(topic, size, key, NULL, (int)BUS_NOWAIT_FLAG); } @@ -161,7 +161,7 @@ return _pub_( topic, topic_size, content, content_size, key, timeout, 0); } int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){ - return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT); + return _pub_(topic, topic_size, content, content_size, key, NULL, (int)BUS_NOWAIT_FLAG); } diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 76e906f..e370c72 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -373,9 +373,9 @@ memcpy(dest.buf, buf, size); - if(flags & SHM_MSG_NOWAIT != 0) { + if( (flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { rv = remoteQueue->push_nowait(dest); - } else if(timeout != NULL) { + } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { rv = remoteQueue->push_timeout(dest, timeout); } else { rv = remoteQueue->push(dest); @@ -393,8 +393,6 @@ logger->error(rv, "sendto key %d failed", key); } return rv; - - } } @@ -433,9 +431,9 @@ shm_msg_t src; - if(flags & SHM_MSG_NOWAIT != 0) { + if((flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { rv = socket->queue->pop_nowait(src); - } else if(timeout != NULL) { + } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { rv = socket->queue->pop_timeout(src, timeout); // printf("0 shm_recvfrom====%d\n", rv); } else { @@ -649,7 +647,7 @@ switch (src.type) { case SHM_SOCKET_OPEN: - socket->acceptQueue->push_timeout(src, &timeout); + socket->acceptQueue->push(src, &timeout, BUS_TIMEOUT_FLAG); break; case SHM_SOCKET_CLOSE: _server_close_conn_to_client(socket, src.key); @@ -660,7 +658,7 @@ if (iter != socket->clientSocketMap->end()) { client_socket = iter->second; // print_msg("_server_run_msg_rev push before", src); - client_socket->messageQueue->push_timeout(src, &timeout); + client_socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG); // print_msg("_server_run_msg_rev push after", src); } @@ -695,7 +693,7 @@ _client_close_conn_to_server(socket); break; case SHM_COMMON_MSG: - socket->messageQueue->push_timeout(src, &timeout); + socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG); break; default: logger->error( "shm_socket._client_run_msg_rev: undefined message type."); diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h index abc8e20..b1665a0 100644 --- a/src/socket/shm_socket.h +++ b/src/socket/shm_socket.h @@ -6,11 +6,7 @@ #include "shm_queue.h" #include "lock_free_queue.h" -enum shm_socket_flag_t -{ - SHM_MSG_TIMEOUT = 1, - SHM_MSG_NOWAIT = 2 -}; + enum shm_connection_status_t { SHM_CONN_CLOSED=1, @@ -88,7 +84,7 @@ int shm_recv(shm_socket_t * socket, void **buf, int *size) ; /** - * @flags : SHM_MSG_NOWAIT + * @flags : BUS_NOWAIT_FLAG */ int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0); diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh index cd4c809..fc639ca 100755 --- a/test_net_socket/net_mod_socket.sh +++ b/test_net_socket/net_mod_socket.sh @@ -36,7 +36,7 @@ } function close() { - ps -ef | grep -e "test_net_mod_socket" -e "net_mod_socket"| awk '{print $2}' | xargs -i kill -9 {} + ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk '{print $2}' | xargs -i kill -9 {} ipcrm -a } @@ -48,8 +48,6 @@ case ${1} in "server") - close - sleep 2 server ;; "client") diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp index 0b2afd6..2fcd604 100644 --- a/test_net_socket/test_net_mod_socket.cpp +++ b/test_net_socket/test_net_mod_socket.cpp @@ -76,10 +76,13 @@ void *recvbuf; int size; int key; - while (net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) == 0) { + int rv; + while ((rv = net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) ) == 0) { printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf); free(recvbuf); } + + printf("print_sub_msg return . rv = %d\n", rv); } diff --git a/test_socket/CMakeLists.txt b/test_socket/CMakeLists.txt new file mode 100644 index 0000000..a3f3fa5 --- /dev/null +++ b/test_socket/CMakeLists.txt @@ -0,0 +1,11 @@ +# add the executable +add_executable(bus_test bus_test.cpp) +target_link_libraries(bus_test PRIVATE shm_queue ${EXTRA_LIBS} ) +target_include_directories(bus_test PRIVATE + "${PROJECT_BINARY_DIR}" + ${EXTRA_INCLUDES} + ) + + + + diff --git a/test_socket/bus_test.cpp b/test_socket/bus_test.cpp new file mode 100644 index 0000000..cdd6142 --- /dev/null +++ b/test_socket/bus_test.cpp @@ -0,0 +1,116 @@ +#include "bus_server_socket.h" +#include "shm_mod_socket.h" +#include "shm_mm_wrapper.h" +#include "usg_common.h" +#include "mm.h" + +BusServerSocket * server_socket; +void sigint_handler(int sig) { + + exit(0); +} + +void server(int key) { + server_socket = new BusServerSocket(); + + server_socket->bind( key); + + server_socket->start(); +} + + +void *run_recv(void *skptr) { + pthread_detach(pthread_self()); + void *recvbuf; + int size; + int key; + ShmModSocket *sk = (ShmModSocket *)skptr; + while (sk->recvfrom( &recvbuf, &size, &key) == 0) { + printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf); + free(recvbuf); + } + +} + +void client(int key) { + ShmModSocket *sk = new ShmModSocket(); + + pthread_t tid; + pthread_create(&tid, NULL, run_recv, (void *)socket); + int size; + + char action[512]; + char topic[512]; + char content[512]; + long i = 0; + while (true) { + //printf("Usage: pub <topic> [content] or sub <topic>\n"); + printf("Can I help you? sub, pub, desub or quit\n"); + scanf("%s",action); + + if(strcmp(action, "sub") == 0) { + printf("Please input topic!\n"); + scanf("%s", topic); + if (sk->sub(topic, strlen(topic), key) == 0) { + printf("%d Sub success!\n", sk->get_key()); + } else { + printf("Sub failture!\n"); + exit(0); + } + + } else if(strcmp(action, "desub") == 0) { + printf("Please input topic!\n"); + scanf("%s", topic); + if (sk->desub(topic, strlen(topic), key) == 0) { + printf("%d Desub success!\n", sk->get_key()); + } else { + printf("Desub failture!\n"); + exit(0); + } + + } else if(strcmp(action, "pub") == 0) { + // printf("%s %s %s\n", action, topic, content); + printf("Please input topic and content\n"); + scanf("%s %s", topic, content); + if(sk->pub(topic, strlen(topic)+1, content, strlen(content)+1, key) == 0){ + printf("%d Pub success!\n", sk->get_key()); + } else { + printf("Pub failture!\n"); + } + + } else if(strcmp(action, "quit") == 0) { + printf("(%d) quit\n", sk->get_key()); + delete sk; + break; + } else { + printf("error input argument\n"); + continue; + } + + } + +} + + + +int main(int argc, char *argv[]) { + shm_mm_wrapper_init(512); + int key; + if (argc < 3) { + fprintf(stderr, "Usage: %s %s|%s <key> ...\n", argv[0], "server", "client"); + return 1; + } + + key = atoi(argv[2]); + + if (strcmp("server", argv[1]) == 0) { + server(key); + + } else if (strcmp("client", argv[1]) == 0) { + client(key); + } + + + + return 0; +} \ No newline at end of file diff --git a/test_socket/dgram_mod_bus.cpp b/test_socket/dgram_mod_bus.cpp deleted file mode 100644 index 042b2fd..0000000 --- a/test_socket/dgram_mod_bus.cpp +++ /dev/null @@ -1,131 +0,0 @@ -#include "dgram_mod_socket.h" -#include "shm_mm_wraper.h" -#include "usg_common.h" -#include "mm.h" - -void * server_socket; -void sigint_handler(int sig) { - dgram_mod_close_socket(server_socket); - exit(0); -} - -void server(int key, bool restart) { - server_socket = dgram_mod_open_socket(); - - - if(restart) { - dgram_mod_force_bind(server_socket, key); - } else { - dgram_mod_bind(server_socket, key); - } - - - dgram_mod_start_bus(server_socket); -} - - -void *run_recv(void *socket) { - pthread_detach(pthread_self()); - void *recvbuf; - int size; - int key; - while (dgram_mod_recvfrom( socket, &recvbuf, &size, &key) == 0) { - printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf); - free(recvbuf); - } - -} - -void client(int key) { - void *socket = dgram_mod_open_socket(); - - pthread_t tid; - pthread_create(&tid, NULL, run_recv, socket); - int size; - - char action[512]; - char topic[512]; - char content[512]; - long i = 0; - while (true) { - //printf("Usage: pub <topic> [content] or sub <topic>\n"); - printf("Can I help you? sub, pub, desub or quit\n"); - scanf("%s",action); - - if(strcmp(action, "sub") == 0) { - printf("Please input topic!\n"); - scanf("%s", topic); - if (dgram_mod_sub(socket, topic, strlen(topic), key) == 0) { - printf("%d Sub success!\n", dgram_mod_get_port(socket)); - } else { - printf("Sub failture!\n"); - exit(0); - } - - } else if(strcmp(action, "desub") == 0) { - printf("Please input topic!\n"); - scanf("%s", topic); - if (dgram_mod_desub(socket, topic, strlen(topic), key) == 0) { - printf("%d Desub success!\n", dgram_mod_get_port(socket)); - } else { - printf("Desub failture!\n"); - exit(0); - } - - } else if(strcmp(action, "pub") == 0) { - // printf("%s %s %s\n", action, topic, content); - printf("Please input topic and content\n"); - scanf("%s %s", topic, content); - if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, key) == 0){ - printf("%d Pub success!\n", dgram_mod_get_port(socket)); - } else { - printf("Pub failture!\n"); - } - - } else if(strcmp(action, "quit") == 0) { - printf("(%d) quit\n", dgram_mod_get_port(socket)); - dgram_mod_close_socket(socket); - break; - } else { - printf("error input argument\n"); - continue; - } - - } - -} - - - -int main(int argc, char *argv[]) { - shm_mm_wrapper_init(512); - int key; - if (argc < 3) { - fprintf(stderr, "Usage: %s %s|%s|rmkey <key> ...\n", argv[0], "server", "client"); - return 1; - } - - key = atoi(argv[2]); - - if (strcmp("server", argv[1]) == 0) { - if(argc >= 4 && strcmp("restart", argv[3]) == 0) { - server(key, true); - } - else{ - server(key, false); - } - - } else if (strcmp("client", argv[1]) == 0) { - client(key); - } else if(strcmp("rmkey", argv[1]) == 0) { - for(int i = 2; i < argc; i++) { - key = atoi(argv[i]); - dgram_mod_remove_key(key); - // printf("%d\n", key); - } - } - - - - return 0; -} \ No newline at end of file -- Gitblit v1.8.0