/** * encapsulate lock_free_queue, populate in userspace */ #ifndef __SHM_QUEUE_H__ #define __SHM_QUEUE_H__ #include "hashtable.h" #include "logger_factory.h" #include "sem_util.h" #include "shm_allocator.h" #include "usg_common.h" #include "array_lock_free_sem_queue.h" #include "lock_free_queue.h" #include "bus_error.h" template class SHMQueue { private: const int KEY; public: /// @brief constructor of the class SHMQueue(int key = 0, size_t qsize = 16); ~SHMQueue(); void force_destroy(); uint32_t size(); bool full(); bool empty(); int push(const ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0); int pop(ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0); ELEM_T &operator[](unsigned i); // @deprecate static size_t remove_queues_exclude(int keys[], size_t length); static size_t remove_queues(int keys[], size_t length); static size_t remove_queue(int key); private: protected: /// @brief the actual queue-> methods are forwarded into the real /// implementation LockFreeQueue *queue; private: /// @brief disable copy constructor declaring it private SHMQueue(const SHMQueue &a_src); }; // @deprecate template size_t SHMQueue::remove_queues_exclude(int keys[], size_t length) { hashtable_t *hashtable = mm_get_hashtable(); std::set *keyset = hashtable_keyset(hashtable); std::set::iterator keyItr; LockFreeQueue *mqueue; bool found; size_t count = 0; for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { found = false; for (size_t i = 0; i < length; i++) { if (*keyItr == keys[i]) { found = true; break; } } if (!found) { // 销毁共享内存的queue mqueue = (LockFreeQueue *)hashtable_get(hashtable, *keyItr); delete mqueue; hashtable_remove(hashtable, *keyItr); count++; } } delete keyset; return count; } template size_t SHMQueue::remove_queues(int keys[], size_t length) { hashtable_t *hashtable = mm_get_hashtable(); LockFreeQueue *mqueue; size_t count = 0; for(int i = 0; i< length; i++) { // 销毁共享内存的queue mqueue = (LockFreeQueue *)mm_get_by_key(keys[i]); delete mqueue; hashtable_remove(hashtable, keys[i]); count++; } return count; } template size_t SHMQueue::remove_queue(int key) { int keys[] = {key}; return remove_queues(keys, 1); } template SHMQueue::SHMQueue(int key, size_t qsize) : KEY(key) { hashtable_t *hashtable = mm_get_hashtable(); queue = (LockFreeQueue *)hashtable_get(hashtable, key); if (queue == NULL || (void *)queue == (void *)1) { queue = new LockFreeQueue(qsize); hashtable_put(hashtable, key, (void *)queue); } // queue->reference++; // LoggerFactory::getLogger()->debug("SHMQueue constructor reference===%d", queue->reference.load()); } template SHMQueue::~SHMQueue() { LoggerFactory::getLogger()->debug("SHMQueue destroy"); delete queue; queue = NULL; hashtable_t *hashtable = mm_get_hashtable(); hashtable_remove(hashtable, KEY); } template uint32_t SHMQueue::size() { return queue->size(); } template bool SHMQueue::full() { return queue->full(); } template bool SHMQueue::empty() { return queue->empty(); } template int SHMQueue::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { int rv = queue->push(a_data, timeout, flag); if(rv == 0) { return 0; } if(rv == ETIMEDOUT) return EBUS_TIMEOUT; else { LoggerFactory::getLogger()->error("LockFreeQueue push_timeout: %s", bus_strerror(rv)); return rv; } } template int SHMQueue::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) { int rv = queue->pop(a_data, timeout, flag); if(rv == 0) { return 0; } if(rv == ETIMEDOUT) return EBUS_TIMEOUT; else { LoggerFactory::getLogger()->error("LockFreeQueue pop_timeout: %s", bus_strerror(rv)); return rv; } return rv; } template ELEM_T &SHMQueue::operator[](unsigned i) { return queue->operator[](i); } #endif