wangzhengquan
2020-07-21 4bf1fc0a7e9a4611fc322ebe1cb39a671aa2ed57
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#include "shm_queue_wrapper.h"
 
#include "mem_pool.h"
#include "hashtable.h"
 
typedef struct ele_t {
    size_t size;
    void * buf;
} ele_t;
 
typedef struct shmqueue_t {
    void *mqueue;
} shmqueue_t;
 
 
 
//移除不包含在keys中的队列
void shm_remove_queues_exclude(void *keys, int length) {
    SHMQueue<ele_t>::remove_queues_exclude((int*)keys, (size_t)length);
}
 
/**
 * 创建队列
 * @ shmqueue 
 * @ key 标识共享队列的唯一标识, key是一个指针里面存储了key的值, 如果key的值为-1系统会自动分配一个key值并把该key的值赋给key指针。如果key的值不会空会检查是否有重复绑定的情况, 有重复就报错没有就创建队列并绑定key.
 * @ queue_size 队列大小
 */
void* shmqueue_create( int * key, int queue_size) {
    int  mkey;
    hashtable_t *hashtable = mm_get_hashtable();
    if(*key == -1) {
        mkey = hashtable_alloc_key(hashtable);
        *key = mkey;
    } else {
        mkey = *key;
        if(hashtable_get(hashtable, mkey)!= NULL) {
            err_exit(0, "key %d has already been in used!", mkey);
            return NULL;
        }
    }
 
    shmqueue_t *shmqueue = (shmqueue_t*)malloc(sizeof(shmqueue_t));
    SHMQueue<ele_t> *queue = new SHMQueue<ele_t>(mkey, queue_size);
    shmqueue->mqueue = (void *)queue;
    return (void *)shmqueue;
}
 
/**
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出
 */
void* shmqueue_attach(int key) {
    hashtable_t *hashtable = mm_get_hashtable();
    if(hashtable_get(hashtable, key)== NULL) {
        err_exit(0, "shmqueue_attach:attach queue at key %d  failed!", key);
        return NULL;
    }
 
    shmqueue_t *shmqueue = (shmqueue_t*)malloc(sizeof(shmqueue_t));
    SHMQueue<ele_t> *queue = new SHMQueue<ele_t>(key, 0);
    shmqueue->mqueue = (void *)queue;
    return (void *)shmqueue;
}
 
/**
 * 销毁
*/
void shmqueue_drop(void * _shmqueue) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    delete (SHMQueue<ele_t> *)shmqueue->mqueue;
    delete shmqueue;
}
 
/**
 * 队列元素的个数
 */
int shmqueue_size(void * _shmqueue) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    return (int)(((SHMQueue<ele_t> *)(shmqueue->mqueue))->size()) ;
}
 
/**
 * 是否已满
 */
int shmqueue_full(void * _shmqueue) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->full();
}
 
/**
 * 是否为空
 */
int shmqueue_empty(void * _shmqueue) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->empty();
}
 
/**
 * 入队, 队列满时等待
 */
int shmqueue_push(void * _shmqueue, void *src, int size) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t dest;
    dest.size = size;
    dest.buf = mm_malloc(size);
    memcpy(dest.buf, src, size);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push(dest);
}
 
/**
 * 入队, 队列满时立即返回
 */
int shmqueue_push_nowait(void * _shmqueue, void *src, int size) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t dest;
    dest.size = size;
    dest.buf = mm_malloc(size);
    memcpy(dest.buf, src, size);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_nowait(dest);
}
 
/**
 * 入队, 指定时间内入队不成功就返回
 * @sec秒
 * @nsec纳秒
 */
int shmqueue_push_timeout(void * _shmqueue, void *src, int size,  int sec, int nsec) {
 
    struct timespec timeout = {sec, nsec};
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t dest;
    dest.size = size;
    dest.buf = mm_malloc(size);
    memcpy(dest.buf, src, size);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_timeout(dest, &timeout);
}
 
/**
 * 出队, 队列空时等待
 */
int shmqueue_pop(void * _shmqueue, void **dest, int *size) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t src;
 
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop(src);
    if (rv) {
        void * _dest = malloc(src.size);
        memcpy(_dest, src.buf, src.size);
        *dest = _dest;
        *size = src.size;
        mm_free(src.buf);
        return 1;
    } else {
        return 0;
    }
 
}
 
/**
 * 出队, 队列空时立即返回
 */
int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t src;
    
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_nowait(src);
    if (rv) {
        void * _dest = malloc(src.size);
        memcpy(dest, src.buf, src.size);
        *dest = _dest;
        *size = src.size;
        mm_free(src.buf);
        return 1;
    } else {
        return 0;
    }
}
 
/**
 * 出队, 指定时间内出队不成功就返回
 * @sec秒
 * @nsec纳秒
 */
int shmqueue_pop_timeout(void * _shmqueue, void **dest, int *size, int sec, int nsec) {
    struct timespec timeout = {sec, nsec};
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t src;
    
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_timeout(src, &timeout);
    if (rv) {
        void * _dest = malloc(src.size);
        memcpy(_dest, src.buf, src.size);
        *dest = _dest;
        *size = src.size;
        mm_free(src.buf);
        return 1;
    } else {
        return 0;
    }
}
 
void shmqueue_free(void *ptr) {
    free(ptr);
}