wangzhengquan
2020-07-13 e1e97f1f98baf82efcd5825d7c7a7b4b1b2f2e40
queue/shm_queue_wrapper.c
@@ -1,48 +1,75 @@
#include "shm_queue_wrapper.h"
#define MAX_ELE_SIZE 512
#include "mm.h"
#include "hashtable.h"
typedef struct ele_t {
   char buf[MAX_ELE_SIZE];
   size_t size;
   void * buf;
} ele_t;
typedef struct shmqueue_t {
   size_t ele_size;
   void *mqueue;
} shmqueue_t;
/**
 * 初始化
 * 创建队列
 * @ shmqueue 
 * @ key 标识共享队列的唯一key
 * @ key 标识共享队列的唯一标识, key是一个指针里面存储了key的值, 如果key的值为-1系统会自动分配一个key值并把该key的值赋给key指针。如果key的值不会空会检查是否有重复绑定的情况, 有重复就报错没有就创建队列并绑定key.
 * @ queue_size 队列大小
 * @ ele_size 队列中元素大小
 * @ size 队列中元素大小
 */
void* shmqueue_init( int key, int queue_size, int ele_size) {
   if(ele_size > MAX_ELE_SIZE) {
      err_exit(0, "shmqueue_init : the element size is supper than max element buffer size");
void* shmqueue_create( int * key, int queue_size) {
   int  mkey;
   hashtable_t *hashtable = get_mm_hashtable();
   if(*key == -1) {
      mkey = hashtable_alloc_key(hashtable);
      *key = mkey;
   } else {
      mkey = *key;
      if(hashtable_get(hashtable, mkey)!= NULL) {
         err_msg(0, "key %d has already been in used!", mkey);
         return NULL;
      }
   }
   shmqueue_t *shmqueue = (shmqueue_t*)malloc(sizeof(shmqueue_t));
   SHMQueue<ele_t> *queue = new SHMQueue<ele_t>(key, queue_size);
   SHMQueue<ele_t> *queue = new SHMQueue<ele_t>(mkey, queue_size);
   shmqueue->mqueue = (void *)queue;
   shmqueue->ele_size = ele_size;
   return (void *)shmqueue;
}
/**
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并返回空值
 */
void* shmqueue_attach(int key) {
   hashtable_t *hashtable = get_mm_hashtable();
   if(hashtable_get(hashtable, key)== NULL) {
      err_msg(0, "shmqueue_attach:attach failed, The queue  binding on key %d has not been created!", key);
      return NULL;
   }
   shmqueue_t *shmqueue = (shmqueue_t*)malloc(sizeof(shmqueue_t));
   SHMQueue<ele_t> *queue = new SHMQueue<ele_t>(key, 0);
   shmqueue->mqueue = (void *)queue;
   return (void *)shmqueue;
}
/**
 * 销毁
*/
void shmqueue_destroy(void * _shmqueue) {
void shmqueue_drop(void * _shmqueue) {
   shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
   delete (SHMQueue<ele_t> *)shmqueue->mqueue;
   delete _shmqueue;
   delete shmqueue;
}
/**
 * 队列元素的个数
 */
uint32_t shmqueue_size(void * _shmqueue) {
int shmqueue_size(void * _shmqueue) {
   shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
   return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->size();
   return (int)(((SHMQueue<ele_t> *)(shmqueue->mqueue))->size()) ;
}
/**
@@ -64,42 +91,51 @@
/**
 * 入队, 队列满时等待
 */
int shmqueue_push(void * _shmqueue, void *src_ele) {
int shmqueue_push(void * _shmqueue, void *src, int size) {
   shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
   ele_t dest_ele;
   memcpy(&dest_ele.buf, src_ele, shmqueue->ele_size);
   return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push(dest_ele);
   ele_t dest;
   dest.size = size;
   dest.buf = mm_malloc(size);
   memcpy(dest.buf, src, size);
   return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push(dest);
}
/**
 * 入队, 队列满时立即返回
 */
int shmqueue_push_nowait(void * _shmqueue, void *src_ele) {
int shmqueue_push_nowait(void * _shmqueue, void *src, int size) {
   shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
   ele_t dest_ele;
   memcpy(&dest_ele.buf, src_ele, shmqueue->ele_size);
   return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_nowait(dest_ele);
   ele_t dest;
   dest.size = size;
   dest.buf = mm_malloc(size);
   memcpy(dest.buf, src, size);
   return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_nowait(dest);
}
/**
 * 入队, 指定时间内入队不成功就返回
 * timespec {sec秒, nsec纳秒}
 */
int shmqueue_push_timeout(void * _shmqueue, void *src_ele, struct timespec * timeout) {
int shmqueue_push_timeout(void * _shmqueue, void *src, int size,  void * _timeout) {
   struct timespec *timeout = (struct timespec *)_timeout;
   shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
   ele_t dest_ele;
   memcpy(&dest_ele.buf, src_ele, shmqueue->ele_size);
   return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_timeout(dest_ele, timeout);
   ele_t dest;
   dest.size = size;
   dest.buf = mm_malloc(size);
   memcpy(dest.buf, src, size);
   return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_timeout(dest, timeout);
}
/**
 * 出队, 队列空时等待
 */
int shmqueue_pop(void * _shmqueue, void *dest_ele) {
int shmqueue_pop(void * _shmqueue, void *dest) {
   shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
   ele_t src_ele;
   bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop(src_ele);
   ele_t src;
   bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop(src);
   if (rv) {
      memcpy(dest_ele, &src_ele.buf, shmqueue->ele_size);
      memcpy(dest, src.buf, src.size);
      mm_free(src.buf);
      return 1;
   } else {
      return 0;
@@ -110,12 +146,13 @@
/**
 * 出队, 队列空时立即返回
 */
int shmqueue_pop_nowait(void * _shmqueue, void *dest_ele) {
int shmqueue_pop_nowait(void * _shmqueue, void *dest) {
   shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
   ele_t src_ele;
   bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_nowait(src_ele);
   ele_t src;
   bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_nowait(src);
   if (rv) {
      memcpy(dest_ele, &src_ele.buf, shmqueue->ele_size);
      memcpy(dest, src.buf, src.size);
      mm_free(src.buf);
      return 1;
   } else {
      return 0;
@@ -125,12 +162,14 @@
/**
 * 出队, 指定时间内出队不成功就返回
 */
int shmqueue_pop_timeout(void * _shmqueue, void *dest_ele, struct timespec * timeout) {
int shmqueue_pop_timeout(void * _shmqueue, void *dest, void * _timeout) {
   struct timespec *timeout = (struct timespec *)_timeout;
   shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
   ele_t src_ele;
   bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_timeout(src_ele, timeout);
   ele_t src;
   bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_timeout(src, timeout);
   if (rv) {
      memcpy(dest_ele, &src_ele.buf, shmqueue->ele_size);
      memcpy(dest, src.buf, src.size);
      mm_free(src.buf);
      return 1;
   } else {
      return 0;