/** * encapsulate lock_free_queue, populate in userspace */ #ifndef __SHM_QUEUE_H__ #define __SHM_QUEUE_H__ #include "hashtable.h" #include "logger_factory.h" #include "sem_util.h" #include "shm_allocator.h" #include "usg_common.h" #include "array_lock_free_sem_queue.h" #include "bus_error.h" template class SHMQueue { private: const int KEY; public: /// @brief constructor of the class SHMQueue(int key = 0, size_t qsize = 16); ~SHMQueue(); void force_destroy(); inline uint32_t size(); inline bool full(); inline bool empty(); inline int push(const ELEM_T &a_data); inline int push_nowait(const ELEM_T &a_data); inline int push_timeout(const ELEM_T &a_data, const struct timespec *timeout); inline int pop(ELEM_T &a_data); inline int pop_nowait(ELEM_T &a_data); inline int pop_timeout(ELEM_T &a_data, struct timespec *timeout); inline ELEM_T &operator[](unsigned i); // @deprecate static size_t remove_queues_exclude(int keys[], size_t length); static size_t remove_queues(int keys[], size_t length); static size_t remove_queue(int key); private: protected: /// @brief the actual queue-> methods are forwarded into the real /// implementation ArrayLockFreeSemQueue *queue; private: /// @brief disable copy constructor declaring it private SHMQueue(const SHMQueue &a_src); }; // @deprecate template size_t SHMQueue::remove_queues_exclude(int keys[], size_t length) { hashtable_t *hashtable = mm_get_hashtable(); std::set *keyset = hashtable_keyset(hashtable); std::set::iterator keyItr; ArrayLockFreeSemQueue *mqueue; bool found; size_t count = 0; for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { found = false; for (size_t i = 0; i < length; i++) { if (*keyItr == keys[i]) { found = true; break; } } if (!found) { // 销毁共享内存的queue mqueue = (ArrayLockFreeSemQueue *)hashtable_get(hashtable, *keyItr); delete mqueue; hashtable_remove(hashtable, *keyItr); count++; } } delete keyset; return count; } template size_t SHMQueue::remove_queues(int keys[], size_t length) { hashtable_t *hashtable = mm_get_hashtable(); ArrayLockFreeSemQueue *mqueue; size_t count = 0; for(int i = 0; i< length; i++) { // 销毁共享内存的queue mqueue = (ArrayLockFreeSemQueue *)mm_get_by_key(keys[i]); delete mqueue; hashtable_remove(hashtable, keys[i]); count++; } return count; } template size_t SHMQueue::remove_queue(int key) { int keys[] = {key}; return remove_queues(keys, 1); } template SHMQueue::SHMQueue(int key, size_t qsize) : KEY(key) { hashtable_t *hashtable = mm_get_hashtable(); queue = (ArrayLockFreeSemQueue *)hashtable_get(hashtable, key); if (queue == NULL || (void *)queue == (void *)1) { queue = new ArrayLockFreeSemQueue(qsize); hashtable_put(hashtable, key, (void *)queue); } // queue->reference++; // LoggerFactory::getLogger()->debug("SHMQueue constructor reference===%d", queue->reference.load()); } template SHMQueue::~SHMQueue() { LoggerFactory::getLogger()->debug("SHMQueue destroy"); delete queue; queue = NULL; hashtable_t *hashtable = mm_get_hashtable(); hashtable_remove(hashtable, KEY); } template inline uint32_t SHMQueue::size() { return queue->size(); } template inline bool SHMQueue::full() { return queue->full(); } template inline bool SHMQueue::empty() { return queue->empty(); } template inline int SHMQueue::push(const ELEM_T &a_data) { int rv = queue->push(a_data); if(rv == -1) { return errno; } else { return 0; } } template inline int SHMQueue::push_nowait(const ELEM_T &a_data) { int rv = queue->push(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT); if(rv == -1) { if (errno == EAGAIN) return EAGAIN; else { err_msg(errno, "LockFreeQueue push_nowait"); return errno; } } return 0; } template inline int SHMQueue::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) { int rv = queue->push(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT); if(rv == -1) { if(errno == ETIMEDOUT) return EBUS_TIMEOUT; else { LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); return errno; } } return 0; } template inline int SHMQueue::pop(ELEM_T &a_data) { // printf("SHMQueue pop before\n"); int rv = queue->pop(a_data); // printf("SHMQueue after before\n"); if(rv == -1) { return errno; } else { return 0; } } template inline int SHMQueue::pop_nowait(ELEM_T &a_data) { int rv = queue->pop(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT); if(rv == -1) { if (errno == EAGAIN) return errno; else { LoggerFactory::getLogger()->error(errno, " SHMQueue pop_nowait"); return errno; } } return 0; } template inline int SHMQueue::pop_timeout(ELEM_T &a_data, struct timespec *timeout) { int rv; rv = queue->pop(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT); if(rv == -1) { if (errno == ETIMEDOUT) { return EBUS_TIMEOUT; } else { LoggerFactory::getLogger()->error(errno, " SHMQueue pop_timeout"); return errno; } } return 0; } template inline ELEM_T &SHMQueue::operator[](unsigned i) { return queue->operator[](i); } #endif