wangzhengquan
2020-07-20 f85c9b875b060681b51f57b15074ba1c7c9f5636
queue/shm_queue_wrapper.c
@@ -1,6 +1,6 @@
#include "shm_queue_wrapper.h"
#include "mm.h"
#include "mem_pool.h"
#include "hashtable.h"
typedef struct ele_t {
@@ -12,12 +12,18 @@
   void *mqueue;
} shmqueue_t;
//移除不包含在keys中的队列
void shm_remove_queues_exclue(void *keys, int length) {
   SHMQueue<ele_t>::remove_queues_exclue((int*)keys, (size_t)length);
}
/**
 * 创建队列
 * @ shmqueue 
 * @ key 标识共享队列的唯一标识, key是一个指针里面存储了key的值, 如果key的值为-1系统会自动分配一个key值并把该key的值赋给key指针。如果key的值不会空会检查是否有重复绑定的情况, 有重复就报错没有就创建队列并绑定key.
 * @ queue_size 队列大小
 * @ size 队列中元素大小
 */
void* shmqueue_create( int * key, int queue_size) {
   int  mkey;
@@ -28,7 +34,7 @@
   } else {
      mkey = *key;
      if(hashtable_get(hashtable, mkey)!= NULL) {
         err_msg(0, "key %d has already been in used!", mkey);
         err_exit(0, "key %d has already been in used!", mkey);
         return NULL;
      }
   }
@@ -40,12 +46,12 @@
}
/**
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并返回空值
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出
 */
void* shmqueue_attach(int key) {
   hashtable_t *hashtable = mm_get_hashtable();
   if(hashtable_get(hashtable, key)== NULL) {
      err_msg(0, "shmqueue_attach:attach failed, The queue  binding on key %d has not been created!", key);
      err_exit(0, "shmqueue_attach:attach queue at key %d  failed!", key);
      return NULL;
   }
@@ -114,27 +120,33 @@
/**
 * 入队, 指定时间内入队不成功就返回
 * timespec {sec秒, nsec纳秒}
 * @sec秒
 * @nsec纳秒
 */
int shmqueue_push_timeout(void * _shmqueue, void *src, int size,  void * _timeout) {
   struct timespec *timeout = (struct timespec *)_timeout;
int shmqueue_push_timeout(void * _shmqueue, void *src, int size,  int sec, int nsec) {
   struct timespec timeout = {sec, nsec};
   shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
   ele_t dest;
   dest.size = size;
   dest.buf = mm_malloc(size);
   memcpy(dest.buf, src, size);
   return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_timeout(dest, timeout);
   return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_timeout(dest, &timeout);
}
/**
 * 出队, 队列空时等待
 */
int shmqueue_pop(void * _shmqueue, void *dest) {
int shmqueue_pop(void * _shmqueue, void **dest, int *size) {
   shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
   ele_t src;
   bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop(src);
   if (rv) {
      memcpy(dest, src.buf, src.size);
      void * _dest = malloc(src.size);
      memcpy(_dest, src.buf, src.size);
      *dest = _dest;
      *size = src.size;
      mm_free(src.buf);
      return 1;
   } else {
@@ -146,12 +158,16 @@
/**
 * 出队, 队列空时立即返回
 */
int shmqueue_pop_nowait(void * _shmqueue, void *dest) {
int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) {
   shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
   ele_t src;
   bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_nowait(src);
   if (rv) {
      void * _dest = malloc(src.size);
      memcpy(dest, src.buf, src.size);
      *dest = _dest;
      *size = src.size;
      mm_free(src.buf);
      return 1;
   } else {
@@ -161,17 +177,29 @@
/**
 * 出队, 指定时间内出队不成功就返回
 * @sec秒
 * @nsec纳秒
 */
int shmqueue_pop_timeout(void * _shmqueue, void *dest, void * _timeout) {
   struct timespec *timeout = (struct timespec *)_timeout;
int shmqueue_pop_timeout(void * _shmqueue, void **dest, int *size, int sec, int nsec) {
   struct timespec timeout = {sec, nsec};
   shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
   ele_t src;
   bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_timeout(src, timeout);
   bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_timeout(src, &timeout);
   if (rv) {
      memcpy(dest, src.buf, src.size);
      void * _dest = malloc(src.size);
      memcpy(_dest, src.buf, src.size);
      *dest = _dest;
      *size = src.size;
      mm_free(src.buf);
      return 1;
   } else {
      return 0;
   }
}
void shmqueue_free(void *ptr) {
   free(ptr);
}