CMakeLists.txt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/CMakeLists.txt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/futex_sem.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/futex_sem.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/psem.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/queue/array_lock_free_queue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/queue/array_lock_free_queue2.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/queue/array_lock_free_sem_queue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/queue/lock_free_queue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/queue/shm_queue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket/shm_socket.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket/shm_socket.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/time_util.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/time_util.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test/futex_demo.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
CMakeLists.txt
@@ -16,6 +16,7 @@ option(BUILD_SHARED_LIBS "Build using shared libraries" ON) option(BUILD_DOC "Build doc" OFF) list(APPEND EXTRA_INCLUDES "${PROJECT_SOURCE_DIR}/include/usgcommon") list(APPEND EXTRA_LIBS ${PROJECT_SOURCE_DIR}/lib/libusgcommon.a pthread rt) src/CMakeLists.txt
@@ -8,24 +8,27 @@ configure_file(bus_config.h.in bus_config.h) add_library(shm_queue ./logger_factory.cpp ./socket/bus_server_socket.cpp ./socket/bus_server_socket_wrapper.cpp ./socket/shm_stream_mod_socket.cpp ./socket/shm_socket.cpp ./socket/shm_mod_socket.cpp ./psem.cpp ./svsem_util.cpp ./bus_error.cpp ./net/net_conn_pool.cpp ./net/net_mod_server_socket_wrapper.cpp ./net/net_mod_socket_wrapper.cpp ./net/net_mod_socket.cpp ./net/net_mod_socket_io.cpp ./net/net_mod_server_socket.cpp ./shm/shm_mm_wrapper.cpp ./shm/mm.cpp ./shm/hashtable.cpp ./logger_factory.cpp ./socket/bus_server_socket.cpp ./socket/bus_server_socket_wrapper.cpp ./socket/shm_stream_mod_socket.cpp ./socket/shm_socket.cpp ./socket/shm_mod_socket.cpp ./time_util.cpp ./psem.cpp ./svsem_util.cpp ./bus_error.cpp ./futex_sem.cpp ./net/net_conn_pool.cpp ./net/net_mod_server_socket_wrapper.cpp ./net/net_mod_socket_wrapper.cpp ./net/net_mod_socket.cpp ./net/net_mod_socket_io.cpp ./net/net_mod_server_socket.cpp ./shm/shm_mm_wrapper.cpp ./shm/mm.cpp ./shm/hashtable.cpp ) @@ -41,7 +44,8 @@ ${CMAKE_CURRENT_SOURCE_DIR}/socket ${CMAKE_CURRENT_SOURCE_DIR}/net ) target_link_libraries(shm_queue PUBLIC ${EXTRA_LIBS} ) # install rules @@ -55,6 +59,8 @@ ./socket/bus_server_socket_wrapper.h ./psem.h ./key_def.h ./time_util.h ./futex_sem.h ./bus_error.h ./svsem_util.h ./logger_factory.h src/futex_sem.cpp
New file @@ -0,0 +1,8 @@ #include "futex_sem.h" int futex(int *uaddr, int futex_op, int val, const struct timespec *timeout, int *uaddr2, int val3) { return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr, val3); } src/futex_sem.h
New file @@ -0,0 +1,15 @@ #ifndef _FUTEXT_SEM_H_ #define _FUTEXT_SEM_H_ #include "usg_common.h" #include <sys/wait.h> #include <sys/mman.h> #include <sys/syscall.h> #include <linux/futex.h> #include <sys/time.h> #include <sys/mman.h> #include <sys/stat.h> /* For mode constants */ #include <fcntl.h> int futex(int *uaddr, int futex_op, int val, const struct timespec *timeout, int *uaddr2, int val3); #endif src/psem.cpp
@@ -1,26 +1,11 @@ #include "psem.h" #include <semaphore.h> #include "time_util.h" #define NANO 1000000000 static struct timespec psem_calc_abs_timeout(const struct timespec *ts) { struct timespec res; struct timespec timeout; if (clock_gettime(CLOCK_REALTIME, &timeout) == -1) err_exit(errno, "clock_gettime"); res.tv_sec = timeout.tv_sec + ts->tv_sec; res.tv_nsec = timeout.tv_nsec + ts->tv_nsec; res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO); res.tv_nsec = res.tv_nsec % NANO; return res; } int psem_timedwait(sem_t *sem, const struct timespec *ts) { struct timespec abs_timeout = psem_calc_abs_timeout(ts); struct timespec abs_timeout = TimeUtil::calc_abs_time(ts); int rv ; while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) { src/queue/array_lock_free_queue.h
@@ -1,5 +1,5 @@ #ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ #define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ #ifndef __ARRAY_LOCK_FREE_QUEUE_H__ #define __ARRAY_LOCK_FREE_QUEUE_H__ #include "atomic_ops.h" #include <assert.h> // assert() #include <sched.h> // sched_yield() src/queue/array_lock_free_queue2.h
File was deleted src/queue/array_lock_free_sem_queue.h
New file @@ -0,0 +1,367 @@ #ifndef __ARRAY_LOCK_FREE_SEM_QUEUE_H__ #define __ARRAY_LOCK_FREE_SEM_QUEUE_H__ #include "atomic_ops.h" #include <assert.h> // assert() #include <sched.h> // sched_yield() #include "logger_factory.h" #include "mem_pool.h" #include "shm_allocator.h" #include "futex_sem.h" #include "time_util.h" /// @brief implementation of an array based lock free queue with support for /// multiple producers /// This class is prevented from being instantiated directly (all members and /// methods are private). To instantiate a multiple producers lock free queue /// you must use the ArrayLockFreeSemQueue fachade: /// ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q; #define LOCK_FREE_QUEUE_TIMEOUT 1 #define LOCK_FREE_QUEUE_NOWAIT 1 << 1 #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE template <typename ELEM_T, typename Allocator = SHM_Allocator> class ArrayLockFreeSemQueue { public: /// @brief constructor of the class ArrayLockFreeSemQueue(size_t qsize = 16); virtual ~ArrayLockFreeSemQueue(); inline uint32_t size(); inline bool full(); inline bool empty(); int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); /// @brief calculate the index in the circular array that corresponds /// to a particular "count" value inline uint32_t countToIndex(uint32_t a_count); ELEM_T& operator[](unsigned i); public: void *operator new(size_t size); void operator delete(void *p); private: size_t Q_SIZE; /// @brief array to keep the elements ELEM_T *m_theQueue; /// @brief where a new element will be inserted uint32_t m_writeIndex; /// @brief where the next element where be extracted from uint32_t m_readIndex; /// @brief maximum read index for multiple producer queues /// If it's not the same as m_writeIndex it means /// there are writes pending to be "committed" to the queue, that means, /// the place for the data was reserved (the index in the array) but /// data is still not in the queue, so the thread trying to read will have /// to wait for those other threads to save the data into the queue /// /// note this is only used for multiple producers uint32_t m_maximumReadIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE /// @brief number of elements in the queue int m_count; #endif private: /// @brief disable copy constructor declaring it private ArrayLockFreeSemQueue<ELEM_T, Allocator>(const ArrayLockFreeSemQueue<ELEM_T> &a_src); }; template <typename ELEM_T, typename Allocator> ArrayLockFreeSemQueue<ELEM_T, Allocator>::ArrayLockFreeSemQueue(size_t qsize): Q_SIZE(qsize), m_writeIndex(0), // initialisation is not atomic m_readIndex(0), // m_maximumReadIndex(0) // #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE ,m_count(0) // #endif { m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); } template <typename ELEM_T, typename Allocator> ArrayLockFreeSemQueue<ELEM_T, Allocator>::~ArrayLockFreeSemQueue() { // std::cout << "destroy ArrayLockFreeSemQueue\n"; Allocator::deallocate(m_theQueue); } template <typename ELEM_T, typename Allocator> inline uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) { // if Q_SIZE is a power of 2 this statement could be also written as // return (a_count & (Q_SIZE - 1)); return (a_count % Q_SIZE); } template <typename ELEM_T, typename Allocator> inline uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::size() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE return m_count; #else uint32_t currentWriteIndex = m_maximumReadIndex; uint32_t currentReadIndex = m_readIndex; // let's think of a scenario where this function returns bogus data // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 // 2. afterwards this thread is preemted. While this thread is inactive 2 // elements are inserted and removed from the queue, so m_maximumReadIndex // is 5 and m_readIndex 4. Real size is still 1 // 3. Now the current thread comes back from preemption and reads m_readIndex. // currentReadIndex is 4 // 4. currentReadIndex is bigger than currentWriteIndex, so // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, // it returns that the queue is almost full, when it is almost empty // if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) { return (currentWriteIndex - currentReadIndex); } else { return (Q_SIZE + currentWriteIndex - currentReadIndex); } #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE } template <typename ELEM_T, typename Allocator> inline bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::full() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE return (m_count == (Q_SIZE)); #else uint32_t currentWriteIndex = m_writeIndex; uint32_t currentReadIndex = m_readIndex; if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) { // the queue is full return true; } else { // not full! return false; } #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE } template <typename ELEM_T, typename Allocator> inline bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::empty() { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE return (m_count == 0); #else if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) { // the queue is full return true; } else { // not full! return false; } #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE } template <typename ELEM_T, typename Allocator> int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { uint32_t currentReadIndex; uint32_t currentWriteIndex; int s; do { currentWriteIndex = m_writeIndex; currentReadIndex = m_readIndex; if (m_count == Q_SIZE) { if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) return -1; else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); return -1; } } else { s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { return -1; } } } } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); // We know now that this index is reserved for us. Use it to save the data m_theQueue[countToIndex(currentWriteIndex)] = a_data; // update the maximum read index after saving the data. It wouldn't fail if there is only one thread // inserting in the queue. It might fail if there are more than 1 producer threads because this // operation has to be done in the same order as the previous CAS while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { // this is a good place to yield the thread in case there are more // software threads than hardware processors and you have more // than 1 producer thread // have a look at sched_yield (POSIX.1b) sched_yield(); } AtomicAdd(&m_count, 1); s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); if (s == -1) err_exit(errno, "futex-FUTEX_WAKE"); return 0; } template <typename ELEM_T, typename Allocator> int ArrayLockFreeSemQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) { uint32_t currentMaximumReadIndex; uint32_t currentReadIndex; int s; do { // to ensure thread-safety when there is more than 1 producer thread // a second index is defined (m_maximumReadIndex) currentReadIndex = m_readIndex; currentMaximumReadIndex = m_maximumReadIndex; #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count == 0) { if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) return -1; else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); return -1; } } else { s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { return -1; } } } #else if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) { // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it return -1; } #endif // retrieve the data from the queue a_data = m_theQueue[countToIndex(currentReadIndex)]; // try to perfrom now the CAS operation on the read index. If we succeed // a_data already contains what m_readIndex pointed to before we // increased it if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE // m_count.fetch_sub(1); AtomicSub(&m_count, 1); #endif s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); if (s == -1) err_exit(errno, "futex-FUTEX_WAKE"); return 0; } // it failed retrieving the element off the queue. Someone else must // have read the element stored at countToIndex(currentReadIndex) // before we could perform the CAS operation } while(1); // keep looping to try again! // Something went wrong. it shouldn't be possible to reach here assert(0); // Add this return statement to avoid compiler warnings return -1; } template <typename ELEM_T, typename Allocator> ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i) { int currentCount = m_count; uint32_t currentReadIndex = m_readIndex; if (i >= currentCount) { std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; std::exit(EXIT_FAILURE); } return m_theQueue[countToIndex(currentReadIndex+i)]; } template <typename ELEM_T, typename Allocator> void * ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator new(size_t size){ return Allocator::allocate(size); } template <typename ELEM_T, typename Allocator> void ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator delete(void *p) { return Allocator::deallocate(p); } #endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ src/queue/lock_free_queue.h
@@ -221,8 +221,7 @@ { LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); if (psem_wait(&slots) == -1) { err_msg(errno, "LockFreeQueue push"); return errno; return -1; } if ( m_qImpl.push(a_data) ) { @@ -241,13 +240,7 @@ int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data) { if (psem_trywait(&slots) == -1) { if (errno == EAGAIN) return EAGAIN; else { err_msg(errno, "LockFreeQueue push_nowait"); return errno; } return -1; } if ( m_qImpl.push(a_data)) { @@ -265,15 +258,8 @@ int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts) { LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n"); int rv; if ( psem_timedwait(&slots, ts) == -1) { if(errno == ETIMEDOUT) return EBUS_TIMEOUT; else { LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); return errno; } return -1; } if (m_qImpl.push(a_data)){ @@ -297,8 +283,7 @@ LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); if (psem_wait(&items) == -1) { LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop"); return errno; return -1; } if (m_qImpl.pop(a_data)) { @@ -316,12 +301,7 @@ int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data) { if (psem_trywait(&items) == -1) { if (errno == EAGAIN) return errno; else { LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait"); return errno; } return -1; } if (m_qImpl.pop(a_data)) { @@ -339,14 +319,7 @@ int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts) { if (psem_timedwait(&items, ts) == -1) { if (errno == ETIMEDOUT) { return EBUS_TIMEOUT; } else { LoggerFactory::getLogger()->error(errno, "3 LockFreeQueue pop_timeout %d", errno); return errno; } return -1; } if (m_qImpl.pop(a_data)) { src/queue/shm_queue.h
@@ -6,12 +6,13 @@ #define __SHM_QUEUE_H__ #include "hashtable.h" #include "lock_free_queue.h" #include "logger_factory.h" #include "sem_util.h" #include "shm_allocator.h" #include "usg_common.h" #include "array_lock_free_sem_queue.h" #include "bus_error.h" template <typename ELEM_T> class SHMQueue { @@ -20,7 +21,7 @@ public: /// @brief constructor of the class SHMQueue(int key = 0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); SHMQueue(int key = 0, size_t qsize = 16); ~SHMQueue(); @@ -49,7 +50,8 @@ protected: /// @brief the actual queue-> methods are forwarded into the real /// implementation LockFreeQueue<ELEM_T, SHM_Allocator> *queue; ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *queue; private: /// @brief disable copy constructor declaring it private @@ -62,7 +64,7 @@ hashtable_t *hashtable = mm_get_hashtable(); std::set<int> *keyset = hashtable_keyset(hashtable); std::set<int>::iterator keyItr; LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue; bool found; size_t count = 0; for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { @@ -75,7 +77,7 @@ } if (!found) { // 销毁共享内存的queue mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); delete mqueue; hashtable_remove(hashtable, *keyItr); count++; @@ -89,11 +91,11 @@ template <typename ELEM_T> size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) { hashtable_t *hashtable = mm_get_hashtable(); LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue; size_t count = 0; for(int i = 0; i< length; i++) { // 销毁共享内存的queue mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]); mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]); delete mqueue; hashtable_remove(hashtable, keys[i]); count++; @@ -111,49 +113,22 @@ SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) { hashtable_t *hashtable = mm_get_hashtable(); queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); queue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); if (queue == NULL || (void *)queue == (void *)1) { queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize); queue = new ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator>(qsize); hashtable_put(hashtable, key, (void *)queue); } queue->reference++; // queue->reference++; // LoggerFactory::getLogger()->debug("SHMQueue constructor reference===%d", queue->reference.load()); } template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() { if(queue == NULL) { // queue已经销毁 return; } sem_wait(&(queue->mutex)); queue->reference--; // LoggerFactory::getLogger()->debug("SHMQueue destructor reference===%d", if (queue->reference.load() == 0) { delete queue; queue = NULL; hashtable_t *hashtable = mm_get_hashtable(); hashtable_remove(hashtable, KEY); // 此时queue已经销毁,无需 sem_post(&(queue->mutex)) // printf("SHMQueue destructor delete queue\n"); } else { sem_post(&(queue->mutex)); } } template <typename ELEM_T> void SHMQueue<ELEM_T>::force_destroy() { if(queue == NULL) { // queue已经销毁 return; } SemUtil::dec(queue->mutex); LoggerFactory::getLogger()->debug("SHMQueue destroy"); delete queue; queue = NULL; hashtable_t *hashtable = mm_get_hashtable(); hashtable_remove(hashtable, KEY); // 此时queue已经销毁,无需 SemUtil::inc(queue->mutex) } template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::size() { @@ -170,36 +145,85 @@ template <typename ELEM_T> inline int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) { return queue->push(a_data); int rv = queue->push(a_data); if(rv == -1) { return errno; } else { return 0; } } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) { return queue->push_nowait(a_data); int rv = queue->push(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT); if(rv == -1) { if (errno == EAGAIN) return EAGAIN; else { err_msg(errno, "LockFreeQueue push_nowait"); return errno; } } return 0; } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) { inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) { return queue->push_timeout(a_data, timeout); int rv = queue->push(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT); if(rv == -1) { if(errno == ETIMEDOUT) return EBUS_TIMEOUT; else { LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); return errno; } } return 0; } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) { // printf("SHMQueue pop before\n"); int rv = queue->pop(a_data); // printf("SHMQueue after before\n"); return rv; if(rv == -1) { return errno; } else { return 0; } } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) { return queue->pop_nowait(a_data); int rv = queue->pop(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT); if(rv == -1) { if (errno == EAGAIN) return errno; else { LoggerFactory::getLogger()->error(errno, " SHMQueue pop_nowait"); return errno; } } return 0; } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) { return queue->pop_timeout(a_data, timeout); int rv; rv = queue->pop(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT); if(rv == -1) { if (errno == ETIMEDOUT) { return EBUS_TIMEOUT; } else { LoggerFactory::getLogger()->error(errno, " SHMQueue pop_timeout"); return errno; } } return 0; } template <typename ELEM_T> @@ -207,4 +231,7 @@ return queue->operator[](i); } #endif src/socket/shm_socket.cpp
@@ -383,10 +383,8 @@ if (rv == 0) { // printf("shm_sendto push after\n"); delete remoteQueue; return 0; } else { delete remoteQueue; mm_free(dest.buf); if(rv > EBUS_BASE) { // bus_errno = EBUS_TIMEOUT; @@ -725,10 +723,7 @@ socket->queue = NULL; } if (socket->remoteQueue != NULL) { delete socket->remoteQueue; socket->remoteQueue = NULL; } if (socket->messageQueue != NULL) { delete socket->messageQueue; @@ -747,7 +742,6 @@ client_socket = iter->second; client_socket->remoteQueue->push_timeout(close_msg, &timeout); delete client_socket->remoteQueue; client_socket->remoteQueue = NULL; delete client_socket->messageQueue; src/socket/shm_socket.h
@@ -4,6 +4,7 @@ #include "usg_common.h" #include "usg_typedef.h" #include "shm_queue.h" #include "lock_free_queue.h" enum shm_socket_flag_t { src/time_util.cpp
New file @@ -0,0 +1,26 @@ #include "time_util.h" #define NANO 1000000000 struct timespec TimeUtil::calc_abs_time(const struct timespec *ts) { struct timespec res; struct timespec timeout; if (clock_gettime(CLOCK_REALTIME, &timeout) == -1) err_exit(errno, "clock_gettime"); res.tv_sec = timeout.tv_sec + ts->tv_sec; res.tv_nsec = timeout.tv_nsec + ts->tv_nsec; res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO); res.tv_nsec = res.tv_nsec % NANO; return res; } struct timespec TimeUtil::trim_time(const struct timespec *ts) { struct timespec res; res.tv_sec = ts->tv_sec + floor(ts->tv_nsec / NANO); res.tv_nsec = ts->tv_nsec % NANO; return res; } src/time_util.h
New file @@ -0,0 +1,14 @@ #ifndef _TIMEUTIL_H_ #define _TIMEUTIL_H_ #include "usg_common.h" class TimeUtil { public: // 计算当前时间+ts的绝对时间 static struct timespec calc_abs_time(const struct timespec *ts); // 如果纳秒大于10e9,向秒进位 static struct timespec trim_time(const struct timespec *ts) ; }; #endif test/futex_demo.cpp
@@ -19,11 +19,12 @@ #include <sys/syscall.h> #include <linux/futex.h> #include <sys/time.h> #include "usg_common.h" #include <sys/mman.h> #include <sys/stat.h> /* For mode constants */ #include <fcntl.h> /* For O_* constants */ #include "usg_common.h" #define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \ } while (0)