#include "shm_queue_wrapper.h" #include "mem_pool.h" #include "hashtable.h" #include "shm_queue.h" #include "shm_allocator.h" typedef struct ele_t { size_t size; void * buf; } ele_t; typedef struct shmqueue_t { void *mqueue; } shmqueue_t; //移除不包含在keys中的队列 void shm_remove_queues_exclude(void *keys, int length) { SHMQueue::remove_queues_exclude((int*)keys, (size_t)length); } /** * 创建队列 * @ shmqueue * @ key 标识共享队列的唯一标识, key是一个指针里面存储了key的值, 如果key的值为-1系统会自动分配一个key值并把该key的值赋给key指针。如果key的值不会空会检查是否有重复绑定的情况, 有重复就报错没有就创建队列并绑定key. * @ queue_size 队列大小 */ void* shmqueue_create( int * key, int queue_size) { int mkey; hashtable_t *hashtable = mm_get_hashtable(); if(*key == -1) { mkey = hashtable_alloc_key(hashtable); *key = mkey; } else { mkey = *key; if(hashtable_get(hashtable, mkey)!= NULL) { err_exit(0, "key %d has already been in used!", mkey); return NULL; } } shmqueue_t *shmqueue = (shmqueue_t*)malloc(sizeof(shmqueue_t)); SHMQueue *queue = new SHMQueue(mkey, queue_size); shmqueue->mqueue = (void *)queue; return (void *)shmqueue; } /** * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出 */ void* shmqueue_attach(int key) { hashtable_t *hashtable = mm_get_hashtable(); if(hashtable_get(hashtable, key)== NULL) { err_exit(0, "shmqueue_attach:attach queue at key %d failed!", key); return NULL; } shmqueue_t *shmqueue = (shmqueue_t*)malloc(sizeof(shmqueue_t)); SHMQueue *queue = new SHMQueue(key, 0); shmqueue->mqueue = (void *)queue; return (void *)shmqueue; } /** * 销毁 */ void shmqueue_drop(void * _shmqueue) { shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; delete (SHMQueue *)shmqueue->mqueue; delete shmqueue; } /** * 队列元素的个数 */ int shmqueue_size(void * _shmqueue) { shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; return (int)(((SHMQueue *)(shmqueue->mqueue))->size()) ; } /** * 是否已满 */ int shmqueue_full(void * _shmqueue) { shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; return ((SHMQueue *)(shmqueue->mqueue))->full(); } /** * 是否为空 */ int shmqueue_empty(void * _shmqueue) { shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; return ((SHMQueue *)(shmqueue->mqueue))->empty(); } /** * 入队, 队列满时等待 */ int shmqueue_push(void * _shmqueue, void *src, int size) { shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; ele_t dest; dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, src, size); return ((SHMQueue *)(shmqueue->mqueue))->push(dest); } /** * 入队, 队列满时立即返回 */ int shmqueue_push_nowait(void * _shmqueue, void *src, int size) { shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; ele_t dest; dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, src, size); return ((SHMQueue *)(shmqueue->mqueue))->push_nowait(dest); } /** * 入队, 指定时间内入队不成功就返回 * @sec秒 * @nsec纳秒 */ int shmqueue_push_timeout(void * _shmqueue, void *src, int size, int sec, int nsec) { struct timespec timeout = {sec, nsec}; shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; ele_t dest; dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, src, size); return ((SHMQueue *)(shmqueue->mqueue))->push_timeout(dest, &timeout); } /** * 出队, 队列空时等待 */ int shmqueue_pop(void * _shmqueue, void **dest, int *size) { shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; ele_t src; bool rv = ((SHMQueue *)(shmqueue->mqueue))->pop(src); if (rv) { void * _dest = malloc(src.size); memcpy(_dest, src.buf, src.size); *dest = _dest; *size = src.size; mm_free(src.buf); return 1; } else { return 0; } } /** * 出队, 队列空时立即返回 */ int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) { shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; ele_t src; bool rv = ((SHMQueue *)(shmqueue->mqueue))->pop_nowait(src); if (rv) { void * _dest = malloc(src.size); memcpy(dest, src.buf, src.size); *dest = _dest; *size = src.size; mm_free(src.buf); return 1; } else { return 0; } } /** * 出队, 指定时间内出队不成功就返回 * @sec秒 * @nsec纳秒 */ int shmqueue_pop_timeout(void * _shmqueue, void **dest, int *size, int sec, int nsec) { struct timespec timeout = {sec, nsec}; shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; ele_t src; bool rv = ((SHMQueue *)(shmqueue->mqueue))->pop_timeout(src, &timeout); if (rv) { void * _dest = malloc(src.size); memcpy(_dest, src.buf, src.size); *dest = _dest; *size = src.size; mm_free(src.buf); return 1; } else { return 0; } } void shmqueue_free(void *ptr) { free(ptr); }