| | |
| | | set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}") |
| | | # set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}") |
| | | |
| | | option(BUILD_SHARED_LIBS "Build using shared libraries" ON) |
| | | if (CMAKE_BUILD_TYPE EQUAL "Release") |
| | | #option(BUILD_SHARED_LIBS "Build using shared libraries" ON) |
| | | endif() |
| | | |
| | | option(BUILD_DOC "Build doc" OFF) |
| | | |
| | | |
| | |
| | | add_subdirectory(${PROJECT_SOURCE_DIR}/src) |
| | | add_subdirectory(${PROJECT_SOURCE_DIR}/test) |
| | | add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket) |
| | | add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket) |
| | | endif() |
| | |
| | | |
| | | BUILD_TYPE="Debug" |
| | | BUILD_DOC="OFF" |
| | | BUILD_SHARED_LIBS="OFF" |
| | | |
| | | function usage() { |
| | | echo "build.sh [release | debug | doc]" |
| | |
| | | case ${1} in |
| | | "release") |
| | | BUILD_TYPE="Release" |
| | | BUILD_SHARED_LIBS="ON" |
| | | ;; |
| | | |
| | | "debug") |
| | |
| | | # -DBUILD_SHARED_LIBS=ON |
| | | # -DCMAKE_INSTALL_PREFIX=$(pwd/../dest) |
| | | # -DQCA_MAN_INSTALL_DIR:PATH=/usr/share/man |
| | | cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DBUILD_SHARED_LIBS=ON \ |
| | | cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DBUILD_SHARED_LIBS=${BUILD_SHARED_LIBS} \ |
| | | -DBUILD_DOC=${BUILD_DOC} -DSUPPORT_RDMA=OFF .. |
| | | |
| | | cmake --build . |
| | |
| | | |
| | | |
| | | # should we use our own math functions |
| | | option(SUPPORT_RDMA "If support rdma" OFF) |
| | | |
| | |
| | | ./socket/shm_mod_socket.cpp |
| | | ./time_util.cpp |
| | | ./psem.cpp |
| | | ./svsem.cpp |
| | | ./bus_error.cpp |
| | | ./futex_sem.cpp |
| | | ./svsem.cpp |
| | | ./net/net_conn_pool.cpp |
| | | ./net/net_mod_server_socket_wrapper.cpp |
| | | ./net/net_mod_socket_wrapper.cpp |
| | |
| | | ./shm/shm_mm_wrapper.cpp |
| | | ./shm/mm.cpp |
| | | ./shm/hashtable.cpp |
| | | |
| | | |
| | | ) |
| | | |
| | |
| | | ./time_util.h |
| | | ./futex_sem.h |
| | | ./bus_error.h |
| | | ./svsem.h |
| | | ./bus_def.h |
| | | ./logger_factory.h |
| | | ./queue/linked_lock_free_queue.h |
| | | ./queue/array_lock_free_queue2.h |
| | | ./queue/array_lock_free_queue.h |
| | | ./queue/shm_queue.h |
| | | ./queue/array_lock_free_sem_queue.h |
| | | ./queue/lock_free_queue.h |
| | | ./svsem.h |
| | | ./net/net_conn_pool.h |
| | | ./net/net_mod_socket.h |
| | | ./net/net_mod_server_socket_wrapper.h |
| | |
| | | ./shm/shm_allocator.h |
| | | |
| | | |
| | | |
| | | DESTINATION include) |
| | | |
| | | install(FILES "${PROJECT_BINARY_DIR}/src/bus_config.h" |
New file |
| | |
| | | #ifndef _BUS_DEF_H_ |
| | | #define _BUS_DEF_H_ |
| | | |
| | | #define BUS_TIMEOUT_FLAG 1 |
| | | #define BUS_NOWAIT_FLAG 1 << 1 |
| | | |
| | | #endif |
| | |
| | | sprintf(portstr, "%d", port); |
| | | listenfd = open_listenfd(portstr); |
| | | if(listenfd < 0) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start . errno=%d ", errno); |
| | | LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start. port = %d ", port); |
| | | return -1; |
| | | } |
| | | init_pool(listenfd); |
| | |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int NetModSocket::recvfrom(void **buf, int *size, int *key) { |
| | | |
| | | logger->debug(" %d NetModSocket::recvfrom before", get_key()); |
| | | int rv = shmModSocket.recvfrom(buf, size, key); |
| | | |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); |
| | | logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key); |
| | | return 0; |
| | | } |
| | | |
| | |
| | | struct timespec timeout = {sec, nsec}; |
| | | int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); |
| | | logger->debug("NetModSocket::recvfrom_timeout: %d recvfrom %d success.\n", get_key(), *key); |
| | | return 0; |
| | | } |
| | | |
| | |
| | | int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){ |
| | | int rv = shmModSocket.recvfrom_nowait(buf, size, key); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); |
| | | logger->debug("NetModSocket::recvfrom_nowait: %d recvfrom %d success.\n", get_key(), *key); |
| | | return 0; |
| | | } |
| | | |
| | |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){ |
| | | int rv; |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvfrom(buf, size, key); |
| | | |
| | | logger->debug(" %d net_mod_socket_recvfrom before", net_mod_socket_get_key(_socket)); |
| | | rv = sockt->recvfrom(buf, size, key); |
| | | logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv); |
| | | return rv; |
| | | } |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){ |
| | |
| | | #ifndef __ARRAY_LOCK_FREE_QUEUE_H__ |
| | | #define __ARRAY_LOCK_FREE_QUEUE_H__ |
| | | |
| | | #include "atomic_ops.h" |
| | | #include <assert.h> // assert() |
| | | #include <sched.h> // sched_yield() |
| | |
| | | |
| | | #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | template <typename ELEM_T, typename Allocator = SHM_Allocator> |
| | | class ArrayLockFreeQueue |
| | | { |
| | | // ArrayLockFreeQueue will be using this' private members |
| | | template < |
| | | typename ELEM_T_, |
| | | typename Allocator_, |
| | | template <typename T, typename AT> class Q_TYPE |
| | | > |
| | | friend class LockFreeQueue; |
| | | template<typename ELEM_T, typename Allocator = SHM_Allocator> |
| | | class ArrayLockFreeQueue { |
| | | // ArrayLockFreeQueue will be using this' private members |
| | | template< |
| | | typename ELEM_T_, |
| | | typename Allocator_, |
| | | template<typename T, typename AT> class Q_TYPE |
| | | > |
| | | friend |
| | | class LockFreeQueue; |
| | | |
| | | private: |
| | | /// @brief constructor of the class |
| | | ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | | virtual ~ArrayLockFreeQueue(); |
| | | |
| | | inline uint32_t size(); |
| | | |
| | | inline bool full(); |
| | | /// @brief constructor of the class |
| | | ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | | inline bool empty(); |
| | | |
| | | bool push(const ELEM_T &a_data); |
| | | |
| | | bool pop(ELEM_T &a_data); |
| | | |
| | | /// @brief calculate the index in the circular array that corresponds |
| | | /// to a particular "count" value |
| | | inline uint32_t countToIndex(uint32_t a_count); |
| | | virtual ~ArrayLockFreeQueue(); |
| | | |
| | | ELEM_T& operator[](unsigned i); |
| | | |
| | | private: |
| | | size_t Q_SIZE; |
| | | /// @brief array to keep the elements |
| | | ELEM_T *m_theQueue; |
| | | inline uint32_t size(); |
| | | |
| | | /// @brief where a new element will be inserted |
| | | uint32_t m_writeIndex; |
| | | inline bool full(); |
| | | |
| | | /// @brief where the next element where be extracted from |
| | | uint32_t m_readIndex; |
| | | |
| | | /// @brief maximum read index for multiple producer queues |
| | | /// If it's not the same as m_writeIndex it means |
| | | /// there are writes pending to be "committed" to the queue, that means, |
| | | /// the place for the data was reserved (the index in the array) but |
| | | /// data is still not in the queue, so the thread trying to read will have |
| | | /// to wait for those other threads to save the data into the queue |
| | | /// |
| | | /// note this is only used for multiple producers |
| | | uint32_t m_maximumReadIndex; |
| | | inline bool empty(); |
| | | |
| | | bool push(const ELEM_T &a_data); |
| | | |
| | | bool pop(ELEM_T &a_data); |
| | | |
| | | /// @brief calculate the index in the circular array that corresponds |
| | | /// to a particular "count" value |
| | | inline uint32_t countToIndex(uint32_t a_count); |
| | | |
| | | ELEM_T &operator[](unsigned i); |
| | | |
| | | private: |
| | | size_t Q_SIZE; |
| | | /// @brief array to keep the elements |
| | | ELEM_T *m_theQueue; |
| | | |
| | | /// @brief where a new element will be inserted |
| | | uint32_t m_writeIndex; |
| | | |
| | | /// @brief where the next element where be extracted from |
| | | uint32_t m_readIndex; |
| | | |
| | | /// @brief maximum read index for multiple producer queues |
| | | /// If it's not the same as m_writeIndex it means |
| | | /// there are writes pending to be "committed" to the queue, that means, |
| | | /// the place for the data was reserved (the index in the array) but |
| | | /// data is still not in the queue, so the thread trying to read will have |
| | | /// to wait for those other threads to save the data into the queue |
| | | /// |
| | | /// note this is only used for multiple producers |
| | | uint32_t m_maximumReadIndex; |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | /// @brief number of elements in the queue |
| | | uint32_t m_count; |
| | | /// @brief number of elements in the queue |
| | | uint32_t m_count; |
| | | #endif |
| | | |
| | | |
| | | |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); |
| | | /// @brief disable copy constructor declaring it private |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); |
| | | |
| | | }; |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | template<typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize): |
| | | Q_SIZE(qsize), |
| | | m_writeIndex(0), // initialisation is not atomic |
| | | m_readIndex(0), // |
| | | m_maximumReadIndex(0) // |
| | | Q_SIZE(qsize), |
| | | m_writeIndex(0), // initialisation is not atomic |
| | | m_readIndex(0), // |
| | | m_maximumReadIndex(0) // |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | ,m_count(0) // |
| | | , m_count(0) // |
| | | #endif |
| | | { |
| | | m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); |
| | | m_theQueue = (ELEM_T *) Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() |
| | | { |
| | | // std::cout << "destroy ArrayLockFreeQueue\n"; |
| | | Allocator::deallocate(m_theQueue); |
| | | |
| | | template<typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() { |
| | | // std::cout << "destroy ArrayLockFreeQueue\n"; |
| | | Allocator::deallocate(m_theQueue); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) |
| | | { |
| | | // if Q_SIZE is a power of 2 this statement could be also written as |
| | | // return (a_count & (Q_SIZE - 1)); |
| | | return (a_count % Q_SIZE); |
| | | template<typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) { |
| | | // if Q_SIZE is a power of 2 this statement could be also written as |
| | | // return (a_count & (Q_SIZE - 1)); |
| | | return (a_count % Q_SIZE); |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() |
| | | { |
| | | template<typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return m_count; |
| | | return m_count; |
| | | #else |
| | | |
| | | uint32_t currentWriteIndex = m_maximumReadIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | uint32_t currentWriteIndex = m_maximumReadIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | |
| | | // let's think of a scenario where this function returns bogus data |
| | | // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run |
| | | // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 |
| | | // 2. afterwards this thread is preemted. While this thread is inactive 2 |
| | | // elements are inserted and removed from the queue, so m_maximumReadIndex |
| | | // is 5 and m_readIndex 4. Real size is still 1 |
| | | // 3. Now the current thread comes back from preemption and reads m_readIndex. |
| | | // currentReadIndex is 4 |
| | | // 4. currentReadIndex is bigger than currentWriteIndex, so |
| | | // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, |
| | | // it returns that the queue is almost full, when it is almost empty |
| | | // |
| | | if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) |
| | | { |
| | | return (currentWriteIndex - currentReadIndex); |
| | | } |
| | | else |
| | | { |
| | | return (Q_SIZE + currentWriteIndex - currentReadIndex); |
| | | } |
| | | // let's think of a scenario where this function returns bogus data |
| | | // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run |
| | | // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 |
| | | // 2. afterwards this thread is preemted. While this thread is inactive 2 |
| | | // elements are inserted and removed from the queue, so m_maximumReadIndex |
| | | // is 5 and m_readIndex 4. Real size is still 1 |
| | | // 3. Now the current thread comes back from preemption and reads m_readIndex. |
| | | // currentReadIndex is 4 |
| | | // 4. currentReadIndex is bigger than currentWriteIndex, so |
| | | // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, |
| | | // it returns that the queue is almost full, when it is almost empty |
| | | // |
| | | if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) |
| | | { |
| | | return (currentWriteIndex - currentReadIndex); |
| | | } |
| | | else |
| | | { |
| | | return (Q_SIZE + currentWriteIndex - currentReadIndex); |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() |
| | | { |
| | | template<typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count == (Q_SIZE)); |
| | | return (m_count == (Q_SIZE)); |
| | | #else |
| | | |
| | | uint32_t currentWriteIndex = m_writeIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | |
| | | uint32_t currentWriteIndex = m_writeIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | |
| | | if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template<typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count == 0); |
| | | #else |
| | | |
| | | if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | |
| | | template<typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) { |
| | | uint32_t currentReadIndex; |
| | | uint32_t currentWriteIndex; |
| | | |
| | | do { |
| | | |
| | | currentWriteIndex = m_writeIndex; |
| | | currentReadIndex = m_readIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count == Q_SIZE) { |
| | | return false; |
| | | } |
| | | #else |
| | | if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count == 0); |
| | | #else |
| | | |
| | | if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) |
| | | { |
| | | uint32_t currentReadIndex; |
| | | uint32_t currentWriteIndex; |
| | | |
| | | do |
| | | { |
| | | |
| | | currentWriteIndex = m_writeIndex; |
| | | currentReadIndex = m_readIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count == Q_SIZE) { |
| | | return false; |
| | | } |
| | | #else |
| | | if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); |
| | | |
| | | // We know now that this index is reserved for us. Use it to save the data |
| | | m_theQueue[countToIndex(currentWriteIndex)] = a_data; |
| | | |
| | | // update the maximum read index after saving the data. It wouldn't fail if there is only one thread |
| | | // inserting in the queue. It might fail if there are more than 1 producer threads because this |
| | | // operation has to be done in the same order as the previous CAS |
| | | |
| | | while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) |
| | | { |
| | | // this is a good place to yield the thread in case there are more |
| | | // software threads than hardware processors and you have more |
| | | // than 1 producer thread |
| | | // have a look at sched_yield (POSIX.1b) |
| | | sched_yield(); |
| | | } |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | AtomicAdd(&m_count, 1); |
| | | #endif |
| | | return true; |
| | | |
| | | } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); |
| | | |
| | | // We know now that this index is reserved for us. Use it to save the data |
| | | m_theQueue[countToIndex(currentWriteIndex)] = a_data; |
| | | |
| | | // update the maximum read index after saving the data. It wouldn't fail if there is only one thread |
| | | // inserting in the queue. It might fail if there are more than 1 producer threads because this |
| | | // operation has to be done in the same order as the previous CAS |
| | | |
| | | while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { |
| | | // this is a good place to yield the thread in case there are more |
| | | // software threads than hardware processors and you have more |
| | | // than 1 producer thread |
| | | // have a look at sched_yield (POSIX.1b) |
| | | sched_yield(); |
| | | } |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | AtomicAdd(&m_count, 1); |
| | | #endif |
| | | return true; |
| | | } |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) |
| | | { |
| | | uint32_t currentMaximumReadIndex; |
| | | uint32_t currentReadIndex; |
| | | template<typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) { |
| | | uint32_t currentMaximumReadIndex; |
| | | uint32_t currentReadIndex; |
| | | |
| | | do |
| | | { |
| | | // to ensure thread-safety when there is more than 1 producer thread |
| | | // a second index is defined (m_maximumReadIndex) |
| | | currentReadIndex = m_readIndex; |
| | | currentMaximumReadIndex = m_maximumReadIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | do { |
| | | // to ensure thread-safety when there is more than 1 producer thread |
| | | // a second index is defined (m_maximumReadIndex) |
| | | currentReadIndex = m_readIndex; |
| | | currentMaximumReadIndex = m_maximumReadIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count == 0) { |
| | | return false; |
| | | } |
| | | #else |
| | | if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) |
| | | { |
| | | // the queue is empty or |
| | | // a producer thread has allocate space in the queue but is |
| | | // waiting to commit the data into it |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | // retrieve the data from the queue |
| | | a_data = m_theQueue[countToIndex(currentReadIndex)]; |
| | | |
| | | // try to perfrom now the CAS operation on the read index. If we succeed |
| | | // a_data already contains what m_readIndex pointed to before we |
| | | // increased it |
| | | if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | // m_count.fetch_sub(1); |
| | | AtomicSub(&m_count, 1); |
| | | #endif |
| | | return true; |
| | | } |
| | | |
| | | // it failed retrieving the element off the queue. Someone else must |
| | | // have read the element stored at countToIndex(currentReadIndex) |
| | | // before we could perform the CAS operation |
| | | |
| | | } while(1); // keep looping to try again! |
| | | |
| | | // Something went wrong. it shouldn't be possible to reach here |
| | | assert(0); |
| | | |
| | | // Add this return statement to avoid compiler warnings |
| | | return false; |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) |
| | | { |
| | | int currentCount = m_count; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | if (i >= currentCount) |
| | | { |
| | | std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; |
| | | std::exit(EXIT_FAILURE); |
| | | if (m_count == 0) { |
| | | return false; |
| | | } |
| | | return m_theQueue[countToIndex(currentReadIndex+i)]; |
| | | #else |
| | | if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) |
| | | { |
| | | // the queue is empty or |
| | | // a producer thread has allocate space in the queue but is |
| | | // waiting to commit the data into it |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | // retrieve the data from the queue |
| | | a_data = m_theQueue[countToIndex(currentReadIndex)]; |
| | | |
| | | // try to perfrom now the CAS operation on the read index. If we succeed |
| | | // a_data already contains what m_readIndex pointed to before we |
| | | // increased it |
| | | if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | // m_count.fetch_sub(1); |
| | | AtomicSub(&m_count, 1); |
| | | #endif |
| | | return true; |
| | | } |
| | | |
| | | // it failed retrieving the element off the queue. Someone else must |
| | | // have read the element stored at countToIndex(currentReadIndex) |
| | | // before we could perform the CAS operation |
| | | |
| | | } while (1); // keep looping to try again! |
| | | |
| | | // Something went wrong. it shouldn't be possible to reach here |
| | | assert(0); |
| | | |
| | | // Add this return statement to avoid compiler warnings |
| | | return false; |
| | | } |
| | | |
| | | template<typename ELEM_T, typename Allocator> |
| | | ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) { |
| | | int currentCount = m_count; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | if (i >= currentCount) { |
| | | std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i |
| | | << " is out of range\n"; |
| | | std::exit(EXIT_FAILURE); |
| | | } |
| | | return m_theQueue[countToIndex(currentReadIndex + i)]; |
| | | } |
| | | |
| | | #endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ |
| | |
| | | #include "shm_allocator.h" |
| | | #include "futex_sem.h" |
| | | #include "time_util.h" |
| | | |
| | | #include "bus_def.h" |
| | | |
| | | /// @brief implementation of an array based lock free queue with support for |
| | | /// multiple producers |
| | |
| | | /// you must use the ArrayLockFreeSemQueue fachade: |
| | | /// ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q; |
| | | |
| | | #define LOCK_FREE_QUEUE_TIMEOUT 1 |
| | | #define LOCK_FREE_QUEUE_NOWAIT 1 << 1 |
| | | |
| | | |
| | | #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | |
| | | uint32_t currentWriteIndex; |
| | | int s; |
| | | |
| | | // sigset_t mask_all, pre; |
| | | // sigfillset(&mask_all); |
| | | do |
| | | { |
| | | currentWriteIndex = m_writeIndex; |
| | | currentReadIndex = m_readIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | if (m_count == Q_SIZE) { |
| | | if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) |
| | | if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) |
| | | return -1; |
| | | else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { |
| | | else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | const struct timespec ts = TimeUtil::trim_time(timeout); |
| | | s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | |
| | | if (currentReadIndex == currentWriteIndex - Q_SIZE + 1 ) |
| | | { |
| | | // the queue is full |
| | | if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) |
| | | if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) |
| | | return -1; |
| | | else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { |
| | | else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | const struct timespec ts = TimeUtil::trim_time(timeout); |
| | | s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | |
| | | // We know now that this index is reserved for us. Use it to save the data |
| | | m_theQueue[countToIndex(currentWriteIndex)] = a_data; |
| | | |
| | | // sigprocmask(SIG_BLOCK, &mask_all, &pre); |
| | | |
| | | // update the maximum read index after saving the data. It wouldn't fail if there is only one thread |
| | | // inserting in the queue. It might fail if there are more than 1 producer threads because this |
| | | // operation has to be done in the same order as the previous CAS |
| | | |
| | | while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) |
| | | { |
| | | // this is a good place to yield the thread in case there are more |
| | |
| | | err_exit(errno, "futex-FUTEX_WAKE"); |
| | | #endif |
| | | |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return 0; |
| | | } |
| | | |
| | |
| | | uint32_t currentMaximumReadIndex; |
| | | uint32_t currentReadIndex; |
| | | int s; |
| | | |
| | | // sigset_t mask_all, pre; |
| | | // sigfillset(&mask_all); |
| | | |
| | | // sigprocmask(SIG_BLOCK, &mask_all, &pre); |
| | | |
| | | do |
| | | { |
| | |
| | | |
| | | if (m_count == 0) { |
| | | |
| | | if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) |
| | | if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return -1; |
| | | else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { |
| | | } |
| | | else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | const struct timespec ts = TimeUtil::trim_time(timeout); |
| | | s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return -1; |
| | | } |
| | | |
| | | } else { |
| | | s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return -1; |
| | | } |
| | | } |
| | |
| | | // the queue is empty or |
| | | // a producer thread has allocate space in the queue but is |
| | | // waiting to commit the data into it |
| | | if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) |
| | | if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return -1; |
| | | else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { |
| | | } |
| | | else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | const struct timespec ts = TimeUtil::trim_time(timeout); |
| | | s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return -1; |
| | | } |
| | | |
| | | } else { |
| | | s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0); |
| | | if (s == -1 && errno != EAGAIN && errno != EINTR) { |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return -1; |
| | | } |
| | | } |
| | |
| | | err_exit(errno, "futex-FUTEX_WAKE"); |
| | | #endif |
| | | |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return 0; |
| | | } |
| | | |
| | |
| | | #include "shm_allocator.h" |
| | | #include "psem.h" |
| | | #include "bus_error.h" |
| | | |
| | | #include "bus_def.h" |
| | | // default Queue size |
| | | #define LOCK_FREE_Q_DEFAULT_SIZE 16 |
| | | |
| | | // static Logger *logger = LoggerFactory::getLogger(); |
| | | // define this macro if calls to "size" must return the real size of the |
| | | // queue. If it is undefined that function will try to take a snapshot of |
| | | // define this macro if calls to "size" must return the real size of the |
| | | // queue. If it is undefined that function will try to take a snapshot of |
| | | // the queue, but returned value might be bogus |
| | | |
| | | |
| | | // forward declarations for default template values |
| | | // |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | template<typename ELEM_T, typename Allocator> |
| | | class ArrayLockFreeQueue; |
| | | |
| | | // template <typename ELEM_T> |
| | |
| | | |
| | | |
| | | /// @brief Lock-free queue based on a circular array |
| | | /// No allocation of extra memory for the nodes handling is needed, but it has |
| | | /// to add extra overhead (extra CAS operation) when inserting to ensure the |
| | | /// thread-safety of the queue when the queue type is not |
| | | /// No allocation of extra memory for the nodes handling is needed, but it has |
| | | /// to add extra overhead (extra CAS operation) when inserting to ensure the |
| | | /// thread-safety of the queue when the queue type is not |
| | | /// ArrayLockFreeQueueSingleProducer. |
| | | /// |
| | | /// examples of instantiation: |
| | |
| | | /// |
| | | /// ELEM_T represents the type of elementes pushed and popped from the queue |
| | | /// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1) |
| | | /// This number should be a power of 2 to ensure |
| | | /// indexes in the circular queue keep stable when the uint32_t |
| | | /// This number should be a power of 2 to ensure |
| | | /// indexes in the circular queue keep stable when the uint32_t |
| | | /// variable that holds the current position rolls over from FFFFFFFF |
| | | /// to 0. For instance |
| | | /// 2 -> 0x02 |
| | | /// 2 -> 0x02 |
| | | /// 4 -> 0x04 |
| | | /// 8 -> 0x08 |
| | | /// 16 -> 0x10 |
| | | /// (...) |
| | | /// (...) |
| | | /// 1024 -> 0x400 |
| | | /// 2048 -> 0x800 |
| | | /// |
| | | /// if queue size is not defined as requested, let's say, for |
| | | /// instance 100, when current position is FFFFFFFF (4,294,967,295) |
| | | /// index in the circular array is 4,294,967,295 % 100 = 95. |
| | | /// When that value is incremented it will be set to 0, that is the |
| | | /// index in the circular array is 4,294,967,295 % 100 = 95. |
| | | /// When that value is incremented it will be set to 0, that is the |
| | | /// last 4 elements of the queue are not used when the counter rolls |
| | | /// over to 0 |
| | | /// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and |
| | | /// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and |
| | | /// ArrayLockFreeQueue are supported (single producer |
| | | /// by default) |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator = SHM_Allocator, |
| | | template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue |
| | | > |
| | | class LockFreeQueue |
| | | { |
| | | template< |
| | | typename ELEM_T, |
| | | typename Allocator = SHM_Allocator, |
| | | template<typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue |
| | | > |
| | | class LockFreeQueue { |
| | | |
| | | private: |
| | | sem_t slots; |
| | | sem_t items; |
| | | sem_t slots; |
| | | sem_t items; |
| | | |
| | | |
| | | |
| | | public: |
| | | sem_t mutex; |
| | | LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | | /// @brief destructor of the class. |
| | | /// Note it is not virtual since it is not expected to inherit from this |
| | | /// template |
| | | ~LockFreeQueue(); |
| | | std::atomic_uint reference; |
| | | /// @brief constructor of the class |
| | | |
| | | sem_t mutex; |
| | | |
| | | /// @brief returns the current number of items in the queue |
| | | /// It tries to take a snapshot of the size of the queue, but in busy environments |
| | | /// this function might return bogus values. |
| | | /// |
| | | /// If a reliable queue size must be kept you might want to have a look at |
| | | /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' |
| | | /// it enables a reliable size though it hits overall performance of the queue |
| | | /// (when the reliable size variable is on it's got an impact of about 20% in time) |
| | | inline uint32_t size(); |
| | | |
| | | /// @brief return true if the queue is full. False otherwise |
| | | /// It tries to take a snapshot of the size of the queue, but in busy |
| | | /// environments this function might return bogus values. See help in method |
| | | /// LockFreeQueue::size |
| | | inline bool full(); |
| | | LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | | inline bool empty(); |
| | | /// @brief destructor of the class. |
| | | /// Note it is not virtual since it is not expected to inherit from this |
| | | /// template |
| | | ~LockFreeQueue(); |
| | | |
| | | inline ELEM_T& operator[](unsigned i); |
| | | |
| | | /// @brief push an element at the tail of the queue |
| | | /// @param the element to insert in the queue |
| | | /// Note that the element is not a pointer or a reference, so if you are using large data |
| | | /// structures to be inserted in the queue you should think of instantiate the template |
| | | /// of the queue as a pointer to that large structure |
| | | /// @return true if the element was inserted in the queue. False if the queue was full |
| | | int push(const ELEM_T &a_data); |
| | | int push_nowait(const ELEM_T &a_data); |
| | | int push_timeout(const ELEM_T &a_data, const struct timespec * timeout); |
| | | |
| | | /// @brief pop the element at the head of the queue |
| | | /// @param a reference where the element in the head of the queue will be saved to |
| | | /// Note that the a_data parameter might contain rubbish if the function returns false |
| | | /// @return true if the element was successfully extracted from the queue. False if the queue was empty |
| | | int pop(ELEM_T &a_data); |
| | | int pop_nowait(ELEM_T &a_data); |
| | | int pop_timeout(ELEM_T &a_data, struct timespec * timeout); |
| | | std::atomic_uint reference; |
| | | /// @brief constructor of the class |
| | | |
| | | |
| | | void *operator new(size_t size); |
| | | void operator delete(void *p); |
| | | /// @brief returns the current number of items in the queue |
| | | /// It tries to take a snapshot of the size of the queue, but in busy environments |
| | | /// this function might return bogus values. |
| | | /// |
| | | /// If a reliable queue size must be kept you might want to have a look at |
| | | /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' |
| | | /// it enables a reliable size though it hits overall performance of the queue |
| | | /// (when the reliable size variable is on it's got an impact of about 20% in time) |
| | | inline uint32_t size(); |
| | | |
| | | /// @brief return true if the queue is full. False otherwise |
| | | /// It tries to take a snapshot of the size of the queue, but in busy |
| | | /// environments this function might return bogus values. See help in method |
| | | /// LockFreeQueue::size |
| | | inline bool full(); |
| | | |
| | | inline bool empty(); |
| | | |
| | | inline ELEM_T &operator[](unsigned i); |
| | | |
| | | /// @brief push an element at the tail of the queue |
| | | /// @param the element to insert in the queue |
| | | /// Note that the element is not a pointer or a reference, so if you are using large data |
| | | /// structures to be inserted in the queue you should think of instantiate the template |
| | | /// of the queue as a pointer to that large structure |
| | | /// @return true if the element was inserted in the queue. False if the queue was full |
| | | int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | /// @brief pop the element at the head of the queue |
| | | /// @param a reference where the element in the head of the queue will be saved to |
| | | /// Note that the a_data parameter might contain rubbish if the function returns false |
| | | /// @return true if the element was successfully extracted from the queue. False if the queue was empty |
| | | int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | |
| | | void *operator new(size_t size); |
| | | |
| | | void operator delete(void *p); |
| | | |
| | | protected: |
| | | /// @brief the actual queue. methods are forwarded into the real |
| | | /// implementation |
| | | Q_TYPE<ELEM_T, Allocator> m_qImpl; |
| | | /// @brief the actual queue. methods are forwarded into the real |
| | | /// implementation |
| | | Q_TYPE<ELEM_T, Allocator> m_qImpl; |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); |
| | | /// @brief disable copy constructor declaring it private |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); |
| | | }; |
| | | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) |
| | | { |
| | | // std::cout << "LockFreeQueue init reference=" << reference << std::endl; |
| | | template< |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) { |
| | | // std::cout << "LockFreeQueue init reference=" << reference << std::endl; |
| | | if (sem_init(&slots, 1, qsize) == -1) |
| | | err_exit(errno, "LockFreeQueue sem_init"); |
| | | if (sem_init(&items, 1, 0) == -1) |
| | |
| | | if (sem_init(&mutex, 1, 1) == -1) |
| | | err_exit(errno, "LockFreeQueue sem_init"); |
| | | |
| | | |
| | | |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() |
| | | { |
| | | |
| | | template< |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() { |
| | | // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy"); |
| | | if(sem_destroy(&slots) == -1) { |
| | | if (sem_destroy(&slots) == -1) { |
| | | err_exit(errno, "LockFreeQueue sem_destroy"); |
| | | } |
| | | if(sem_destroy(&items) == -1) { |
| | | if (sem_destroy(&items) == -1) { |
| | | err_exit(errno, "LockFreeQueue sem_destroy"); |
| | | } |
| | | if(sem_destroy(&mutex) == -1) { |
| | | if (sem_destroy(&mutex) == -1) { |
| | | err_exit(errno, "LockFreeQueue sem_destroy"); |
| | | } |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() |
| | | { |
| | | template< |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() { |
| | | return m_qImpl.size(); |
| | | } |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() |
| | | { |
| | | template< |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() { |
| | | return m_qImpl.full(); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() |
| | | { |
| | | template< |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() { |
| | | return m_qImpl.empty(); |
| | | } |
| | | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) |
| | | { |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); |
| | | if (psem_wait(&slots) == -1) { |
| | | return -1; |
| | | } |
| | | |
| | | if ( m_qImpl.push(a_data) ) { |
| | | psem_post(&items); |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); |
| | | return 0; |
| | | } |
| | | return -1; |
| | | |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data) |
| | | { |
| | | if (psem_trywait(&slots) == -1) { |
| | | return -1; |
| | | } |
| | | |
| | | if ( m_qImpl.push(a_data)) { |
| | | psem_post(&items); |
| | | return 0; |
| | | } |
| | | return -1; |
| | | |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts) |
| | | { |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n"); |
| | | if ( psem_timedwait(&slots, ts) == -1) { |
| | | return -1; |
| | | } |
| | | |
| | | if (m_qImpl.push(a_data)){ |
| | | psem_post(&items); |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n"); |
| | | return 0; |
| | | } |
| | | return -1; |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) |
| | | { |
| | | |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); |
| | | if (psem_wait(&items) == -1) { |
| | | return -1; |
| | | template<typename ELEM_T, |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); |
| | | if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { |
| | | if (psem_trywait(&slots) == -1) { |
| | | return -1; |
| | | } |
| | | } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | if (psem_timedwait(&slots, timeout) == -1) { |
| | | return -1; |
| | | } |
| | | } else { |
| | | if (psem_wait(&slots) == -1) { |
| | | return -1; |
| | | } |
| | | } |
| | | |
| | | |
| | | if (m_qImpl.push(a_data)) { |
| | | psem_post(&items); |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); |
| | | return 0; |
| | | } |
| | | return -1; |
| | | |
| | | } |
| | | |
| | | template<typename ELEM_T, |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) { |
| | | |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); |
| | | |
| | | |
| | | if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { |
| | | if (psem_trywait(&items) == -1) { |
| | | return -1; |
| | | } |
| | | } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | if (psem_timedwait(&items, timeout) == -1) { |
| | | return -1; |
| | | } |
| | | } else { |
| | | if (psem_wait(&items) == -1) { |
| | | return -1; |
| | | } |
| | | } |
| | | |
| | | |
| | | if (m_qImpl.pop(a_data)) { |
| | | psem_post(&slots); |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); |
| | | return 0; |
| | | } |
| | | return -1; |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data) |
| | | { |
| | | if (psem_trywait(&items) == -1) { |
| | | return -1; |
| | | } |
| | | |
| | | if (m_qImpl.pop(a_data)) { |
| | | psem_post(&slots); |
| | | return 0; |
| | | } |
| | | return -1; |
| | | } |
| | | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts) |
| | | { |
| | | if (psem_timedwait(&items, ts) == -1) { |
| | | return -1; |
| | | } |
| | | |
| | | if (m_qImpl.pop(a_data)) { |
| | | psem_post(&slots); |
| | | // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n"); |
| | | return 0; |
| | | } |
| | | return -1; |
| | | |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { |
| | | template<typename ELEM_T, |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | ELEM_T &LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { |
| | | return m_qImpl.operator[](i); |
| | | } |
| | | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){ |
| | | return Allocator::allocate(size); |
| | | template<typename ELEM_T, |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | void *LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size) { |
| | | return Allocator::allocate(size); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | template<typename ELEM_T, |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) { |
| | | return Allocator::deallocate(p); |
| | | } |
| | |
| | | #include "shm_allocator.h" |
| | | #include "usg_common.h" |
| | | #include "array_lock_free_sem_queue.h" |
| | | #include "lock_free_queue.h" |
| | | #include "bus_error.h" |
| | | |
| | | template <typename ELEM_T> class SHMQueue { |
| | |
| | | /// @brief the actual queue-> methods are forwarded into the real |
| | | /// implementation |
| | | |
| | | ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *queue; |
| | | LockFreeQueue<ELEM_T, SHM_Allocator> *queue; |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | std::set<int> *keyset = hashtable_keyset(hashtable); |
| | | std::set<int>::iterator keyItr; |
| | | ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue; |
| | | LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; |
| | | bool found; |
| | | size_t count = 0; |
| | | for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { |
| | |
| | | } |
| | | if (!found) { |
| | | // 销毁共享内存的queue |
| | | mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); |
| | | mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); |
| | | delete mqueue; |
| | | hashtable_remove(hashtable, *keyItr); |
| | | count++; |
| | |
| | | template <typename ELEM_T> |
| | | size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue; |
| | | LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; |
| | | size_t count = 0; |
| | | for(int i = 0; i< length; i++) { |
| | | // 销毁共享内存的queue |
| | | mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]); |
| | | mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]); |
| | | delete mqueue; |
| | | hashtable_remove(hashtable, keys[i]); |
| | | count++; |
| | |
| | | SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) { |
| | | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | queue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); |
| | | queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); |
| | | if (queue == NULL || (void *)queue == (void *)1) { |
| | | queue = new ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator>(qsize); |
| | | queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize); |
| | | hashtable_put(hashtable, key, (void *)queue); |
| | | } |
| | | // queue->reference++; |
| | |
| | | |
| | | template <typename ELEM_T> |
| | | inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) { |
| | | int rv = queue->push(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT); |
| | | int rv = queue->push(a_data, NULL, BUS_NOWAIT_FLAG); |
| | | if(rv == -1) { |
| | | if (errno == EAGAIN) |
| | | return EAGAIN; |
| | |
| | | template <typename ELEM_T> |
| | | inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) { |
| | | |
| | | int rv = queue->push(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT); |
| | | int rv = queue->push(a_data, timeout, BUS_TIMEOUT_FLAG); |
| | | if(rv == -1) { |
| | | if(errno == ETIMEDOUT) |
| | | return EBUS_TIMEOUT; |
| | |
| | | |
| | | template <typename ELEM_T> |
| | | inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) { |
| | | int rv = queue->pop(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT); |
| | | int rv = queue->pop(a_data, NULL, BUS_NOWAIT_FLAG); |
| | | |
| | | if(rv == -1) { |
| | | if (errno == EAGAIN) |
| | |
| | | inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) { |
| | | |
| | | int rv; |
| | | rv = queue->pop(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT); |
| | | rv = queue->pop(a_data, timeout, BUS_TIMEOUT_FLAG); |
| | | if(rv == -1) { |
| | | if (errno == ETIMEDOUT) { |
| | | return EBUS_TIMEOUT; |
| | |
| | | } |
| | | // 发送信息立刻返回。 |
| | | int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){ |
| | | return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | return shm_sendto(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | |
| | | } |
| | | |
| | | int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){ |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG); |
| | | // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!"); |
| | | return rv; |
| | | } |
| | |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); |
| | | } |
| | | int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); |
| | | } |
| | | int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | |
| | | return _sub_(topic, size, key, timeout, 0); |
| | | } |
| | | int ShmModSocket::sub_nowait(char *topic, int size, int key) { |
| | | return _sub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | return _sub_(topic, size, key, NULL, (int)BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | |
| | | return _desub_(topic, size, key, timeout, 0); |
| | | } |
| | | int ShmModSocket::desub_nowait(char *topic, int size, int key) { |
| | | return _desub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | return _desub_(topic, size, key, NULL, (int)BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | |
| | | return _pub_( topic, topic_size, content, content_size, key, timeout, 0); |
| | | } |
| | | int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){ |
| | | return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | return _pub_(topic, topic_size, content, content_size, key, NULL, (int)BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | |
| | | memcpy(dest.buf, buf, size); |
| | | |
| | | |
| | | if(flags & SHM_MSG_NOWAIT != 0) { |
| | | if( (flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { |
| | | rv = remoteQueue->push_nowait(dest); |
| | | } else if(timeout != NULL) { |
| | | } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | rv = remoteQueue->push_timeout(dest, timeout); |
| | | } else { |
| | | rv = remoteQueue->push(dest); |
| | |
| | | logger->error(rv, "sendto key %d failed", key); |
| | | } |
| | | return rv; |
| | | |
| | | |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | shm_msg_t src; |
| | | |
| | | if(flags & SHM_MSG_NOWAIT != 0) { |
| | | if((flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { |
| | | rv = socket->queue->pop_nowait(src); |
| | | } else if(timeout != NULL) { |
| | | } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | rv = socket->queue->pop_timeout(src, timeout); |
| | | // printf("0 shm_recvfrom====%d\n", rv); |
| | | } else { |
| | |
| | | |
| | | switch (src.type) { |
| | | case SHM_SOCKET_OPEN: |
| | | socket->acceptQueue->push_timeout(src, &timeout); |
| | | socket->acceptQueue->push(src, &timeout, BUS_TIMEOUT_FLAG); |
| | | break; |
| | | case SHM_SOCKET_CLOSE: |
| | | _server_close_conn_to_client(socket, src.key); |
| | |
| | | if (iter != socket->clientSocketMap->end()) { |
| | | client_socket = iter->second; |
| | | // print_msg("_server_run_msg_rev push before", src); |
| | | client_socket->messageQueue->push_timeout(src, &timeout); |
| | | client_socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG); |
| | | // print_msg("_server_run_msg_rev push after", src); |
| | | } |
| | | |
| | |
| | | _client_close_conn_to_server(socket); |
| | | break; |
| | | case SHM_COMMON_MSG: |
| | | socket->messageQueue->push_timeout(src, &timeout); |
| | | socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG); |
| | | break; |
| | | default: |
| | | logger->error( "shm_socket._client_run_msg_rev: undefined message type."); |
| | |
| | | #include "shm_queue.h" |
| | | #include "lock_free_queue.h" |
| | | |
| | | enum shm_socket_flag_t |
| | | { |
| | | SHM_MSG_TIMEOUT = 1, |
| | | SHM_MSG_NOWAIT = 2 |
| | | }; |
| | | |
| | | |
| | | enum shm_connection_status_t { |
| | | SHM_CONN_CLOSED=1, |
| | |
| | | int shm_recv(shm_socket_t * socket, void **buf, int *size) ; |
| | | |
| | | /** |
| | | * @flags : SHM_MSG_NOWAIT |
| | | * @flags : BUS_NOWAIT_FLAG |
| | | */ |
| | | int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0); |
| | | |
| | |
| | | } |
| | | |
| | | function close() { |
| | | ps -ef | grep -e "test_net_mod_socket" -e "net_mod_socket"| awk '{print $2}' | xargs -i kill -9 {} |
| | | ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk '{print $2}' | xargs -i kill -9 {} |
| | | ipcrm -a |
| | | } |
| | | |
| | |
| | | |
| | | case ${1} in |
| | | "server") |
| | | close |
| | | sleep 2 |
| | | server |
| | | ;; |
| | | "client") |
| | |
| | | void *recvbuf; |
| | | int size; |
| | | int key; |
| | | while (net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) == 0) { |
| | | int rv; |
| | | while ((rv = net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) ) == 0) { |
| | | printf("收到订阅消息:%s\n", recvbuf); |
| | | free(recvbuf); |
| | | } |
| | | |
| | | printf("print_sub_msg return . rv = %d\n", rv); |
| | | |
| | | } |
| | | |
New file |
| | |
| | | # add the executable |
| | | add_executable(bus_test bus_test.cpp) |
| | | target_link_libraries(bus_test PRIVATE shm_queue ${EXTRA_LIBS} ) |
| | | target_include_directories(bus_test PRIVATE |
| | | "${PROJECT_BINARY_DIR}" |
| | | ${EXTRA_INCLUDES} |
| | | ) |
| | | |
| | | |
| | | |
| | | |
New file |
| | |
| | | #include "bus_server_socket.h" |
| | | #include "shm_mod_socket.h" |
| | | #include "shm_mm_wrapper.h" |
| | | #include "usg_common.h" |
| | | #include "mm.h" |
| | | |
| | | BusServerSocket * server_socket; |
| | | void sigint_handler(int sig) { |
| | | |
| | | exit(0); |
| | | } |
| | | |
| | | void server(int key) { |
| | | server_socket = new BusServerSocket(); |
| | | |
| | | server_socket->bind( key); |
| | | |
| | | server_socket->start(); |
| | | } |
| | | |
| | | |
| | | void *run_recv(void *skptr) { |
| | | pthread_detach(pthread_self()); |
| | | void *recvbuf; |
| | | int size; |
| | | int key; |
| | | ShmModSocket *sk = (ShmModSocket *)skptr; |
| | | while (sk->recvfrom( &recvbuf, &size, &key) == 0) { |
| | | printf("收到订阅消息:%s\n", recvbuf); |
| | | free(recvbuf); |
| | | } |
| | | |
| | | } |
| | | |
| | | void client(int key) { |
| | | ShmModSocket *sk = new ShmModSocket(); |
| | | |
| | | pthread_t tid; |
| | | pthread_create(&tid, NULL, run_recv, (void *)socket); |
| | | int size; |
| | | |
| | | char action[512]; |
| | | char topic[512]; |
| | | char content[512]; |
| | | long i = 0; |
| | | while (true) { |
| | | //printf("Usage: pub <topic> [content] or sub <topic>\n"); |
| | | printf("Can I help you? sub, pub, desub or quit\n"); |
| | | scanf("%s",action); |
| | | |
| | | if(strcmp(action, "sub") == 0) { |
| | | printf("Please input topic!\n"); |
| | | scanf("%s", topic); |
| | | if (sk->sub(topic, strlen(topic), key) == 0) { |
| | | printf("%d Sub success!\n", sk->get_key()); |
| | | } else { |
| | | printf("Sub failture!\n"); |
| | | exit(0); |
| | | } |
| | | |
| | | } else if(strcmp(action, "desub") == 0) { |
| | | printf("Please input topic!\n"); |
| | | scanf("%s", topic); |
| | | if (sk->desub(topic, strlen(topic), key) == 0) { |
| | | printf("%d Desub success!\n", sk->get_key()); |
| | | } else { |
| | | printf("Desub failture!\n"); |
| | | exit(0); |
| | | } |
| | | |
| | | } else if(strcmp(action, "pub") == 0) { |
| | | // printf("%s %s %s\n", action, topic, content); |
| | | printf("Please input topic and content\n"); |
| | | scanf("%s %s", topic, content); |
| | | if(sk->pub(topic, strlen(topic)+1, content, strlen(content)+1, key) == 0){ |
| | | printf("%d Pub success!\n", sk->get_key()); |
| | | } else { |
| | | printf("Pub failture!\n"); |
| | | } |
| | | |
| | | } else if(strcmp(action, "quit") == 0) { |
| | | printf("(%d) quit\n", sk->get_key()); |
| | | delete sk; |
| | | break; |
| | | } else { |
| | | printf("error input argument\n"); |
| | | continue; |
| | | } |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | int main(int argc, char *argv[]) { |
| | | shm_mm_wrapper_init(512); |
| | | int key; |
| | | if (argc < 3) { |
| | | fprintf(stderr, "Usage: %s %s|%s <key> ...\n", argv[0], "server", "client"); |
| | | return 1; |
| | | } |
| | | |
| | | key = atoi(argv[2]); |
| | | |
| | | if (strcmp("server", argv[1]) == 0) { |
| | | server(key); |
| | | |
| | | } else if (strcmp("client", argv[1]) == 0) { |
| | | client(key); |
| | | } |
| | | |
| | | |
| | | |
| | | return 0; |
| | | } |