| | |
| | | <<<<<<< HEAD |
| | | ## 实例 |
| | | |
| | | ``` |
| | | #include "shm_queue_wrapper.h" |
| | | #include "mm.h" |
| | | |
| | | typedef struct message_t |
| | | { |
| | | char method[20]; |
| | | int code; |
| | | |
| | | } message_t; |
| | | |
| | | void test1() { |
| | | unsigned int i = 0; |
| | | int key = 2; |
| | | // |
| | | size_t qsize = 16; |
| | | void * queue = shmqueue_init( key, qsize, sizeof(message_t)); |
| | | message_t item; |
| | | // LockFreeQueue<struct Item> queue(16); |
| | | for(i = 0; i < qsize; i++) { |
| | | sprintf(item.method, "hello"); |
| | | item.code = i ; |
| | | if(shmqueue_push(queue, (void *)&item)) { |
| | | printf("push:%d %s\n", item.code, item.method ); |
| | | } |
| | | } |
| | | |
| | | struct timespec timeout = {1, 0}; |
| | | |
| | | i = 0; |
| | | while((shmqueue_pop_timeout(queue, (void *)&item, &timeout)) ) { |
| | | printf("pop:%d %s\n", item.code, item.method ); |
| | | // cout << item.pic << endl; |
| | | i++; |
| | | } |
| | | |
| | | //销毁队列 |
| | | shmqueue_destroy(queue); |
| | | } |
| | | |
| | | |
| | | int main () { |
| | | test1(); |
| | | |
| | | //整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。 |
| | | mm_destroy(); |
| | | return 0; |
| | | } |
| | | ``` |
| | | ## 实例 |
| | | 请求应答 `./test2/server.c ./test2/client.c` |
| | | |
| | | ## 接口说明 |
| | | |
| | | ``` |
| | | |
| | | |
| | | /** |
| | | * 初始化 |
| | | * @ shmqueue |
| | | * @ key 标识共享队列的唯一key |
| | | * @ queue_size 队列大小 , 这个值必须是2的指数即 1, 2, 4, 8, 16 等 |
| | | * @ ele_size 队列中元素大小, 这个值不能超过512,当然如果需要可以调整这个最大限制 |
| | | */ |
| | | void* shmqueue_init(int key, int queue_size, int ele_size); |
| | | /** |
| | | * 销毁 |
| | | */ |
| | | void shmqueue_destroy(void *shmqueue); |
| | | /** |
| | | * 队列元素的个数 |
| | | */ |
| | | uint32_t shmqueue_size(void *shmqueue); |
| | | /** |
| | | * 是否已满 |
| | | */ |
| | | int shmqueue_full(void *shmqueue); |
| | | |
| | | /** |
| | | * 是否为空 |
| | | */ |
| | | int shmqueue_empty(void *shmqueue); |
| | | |
| | | /** |
| | | * 入队, 队列满时等待 |
| | | */ |
| | | int shmqueue_push(void *shmqueue, void *src_ele); |
| | | |
| | | /** |
| | | * 入队, 队列满时立即返回 |
| | | */ |
| | | int shmqueue_push_nowait(void *shmqueue, void *src_ele); |
| | | |
| | | /** |
| | | * 入队, 指定时间内入队不成功就返回 |
| | | */ |
| | | int shmqueue_push_timeout(void *shmqueue, void *src_ele, struct timespec * timeout); |
| | | |
| | | /** |
| | | * 出队, 队列空时等待 |
| | | */ |
| | | int shmqueue_pop(void *shmqueue, void *dest_ele); |
| | | |
| | | /** |
| | | * 出队, 队列空时立即返回 |
| | | */ |
| | | int shmqueue_pop_nowait(void *shmqueue, void *dest_ele); |
| | | |
| | | /** |
| | | * 出队, 指定时间内出队不成功就返回 |
| | | */ |
| | | int shmqueue_pop_timeout(void *shmqueue, void *dest_ele, struct timespec * timeout); |
| | | |
| | | ``` |
| | | ======= |
| | | ## softbus |
| | | |
| | | shm的通讯库 |
| | | >>>>>>> dd08a8134dea74ac30213c1b8580bff34ee7095b |
| | | |
| | | |
| | |
| | | #include <algorithm> |
| | | #include <iomanip> |
| | | #include <limits> |
| | | #include <map> |
| | | #include <initializer_list> |
| | | #include <vector> |
| | | #include <map> |
| | | #include <set> |
| | | #include <thread> |
| | | |
| | | #endif |
| | |
| | | #include "hashtable.h" |
| | | #include "mm.h" |
| | | #include "sem_util.h" |
| | | #include <set> |
| | | |
| | | typedef struct tailq_entry_t |
| | | { |
| | |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | void hashtable_removeall(hashtable_t *hashtable) |
| | | { |
| | | tailq_entry_t *item; |
| | |
| | | //释放读写锁 |
| | | SemUtil::inc(hashtable->wlock); |
| | | } |
| | | |
| | | |
| | | |
| | | void hashtable_foreach(hashtable_t *hashtable, hashtable_foreach_cb cb) { |
| | | tailq_entry_t *item; |
| | | for (int i = 0; i < MAPSIZE; i++) { |
| | | tailq_header_t *my_tailq_head = hashtable->array[i] ; |
| | | |
| | | if (my_tailq_head == NULL ) |
| | | continue; |
| | | |
| | | TAILQ_FOREACH(item, my_tailq_head, joint) |
| | | { |
| | | cb(item->key, item -> value); |
| | | } |
| | | } |
| | | } |
| | | |
| | | std::set<int> * hashtable_keyset(hashtable_t *hashtable) { |
| | | std::set<int> *keyset = new std::set<int>; |
| | | tailq_entry_t *item; |
| | | for (int i = 0; i < MAPSIZE; i++) { |
| | | tailq_header_t *my_tailq_head = hashtable->array[i] ; |
| | | |
| | | if (my_tailq_head == NULL ) |
| | | continue; |
| | | |
| | | TAILQ_FOREACH(item, my_tailq_head, joint) |
| | | { |
| | | keyset->insert(item->key); |
| | | } |
| | | } |
| | | return keyset; |
| | | } |
| | |
| | | #define __HASHTABLE_H__ |
| | | |
| | | #include <sys/queue.h> |
| | | //#include "queue.h" |
| | | #include <set> |
| | | |
| | | #define MAPSIZE 100 |
| | | |
| | |
| | | size_t readcnt; |
| | | |
| | | } hashtable_t; |
| | | typedef void (*hashtable_foreach_cb)(int key, void *value); |
| | | |
| | | void hashtable_init(hashtable_t *hashtable); |
| | | void *hashtable_get(hashtable_t *hashtable, int key); |
| | |
| | | void hashtable_removeall(hashtable_t *hashtable); |
| | | |
| | | |
| | | void hashtable_foreach(hashtable_t *hashtable, hashtable_foreach_cb cb); |
| | | |
| | | void hashtable_printall(hashtable_t *hashtable); |
| | | |
| | | int hashtable_alloc_key(hashtable_t *hashtable); |
| | | |
| | | std::set<int> * hashtable_keyset(hashtable_t *hashtable) ; |
| | | #endif |
| | |
| | | |
| | | #include <usg_common.h> |
| | | #include <assert.h> // assert() |
| | | #include "mm.h" |
| | | #include "mem_pool.h" |
| | | #include "sem_util.h" |
| | | #include "logger_factory.h" |
| | | |
| | |
| | | bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data) |
| | | { |
| | | if (SemUtil::dec(items) == -1) { |
| | | err_exit(errno, "remove"); |
| | | err_msg(errno, "LockFreeQueue pop"); |
| | | return false; |
| | | } |
| | | |
| | | if (m_qImpl.pop(a_data)) { |
| | |
| | | if (SemUtil::dec_nowait(items) == -1) { |
| | | if (errno == EAGAIN) |
| | | return false; |
| | | else |
| | | err_exit(errno, "remove_nowait"); |
| | | else { |
| | | err_msg(errno, "LockFreeQueue pop_nowait"); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | if (m_qImpl.pop(a_data)) { |
| | |
| | | if (SemUtil::dec_timeout(items, timeout) == -1) { |
| | | if (errno == EAGAIN) |
| | | return false; |
| | | else |
| | | err_exit(errno, "remove_timeout"); |
| | | else { |
| | | err_msg(errno, "LockFreeQueue pop_timeout"); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | if (m_qImpl.pop(a_data)) { |
| | |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | void * LockFreeQueue<ELEM_T, Q_TYPE>::operator new(size_t size){ |
| | | return mm_malloc(size); |
| | | return mem_pool_malloc(size); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | void LockFreeQueue<ELEM_T, Q_TYPE>::operator delete(void *p) { |
| | | return mm_free(p); |
| | | return mem_pool_free(p); |
| | | } |
| | | |
| | | // include implementation files |
| | |
| | | |
| | | inline ELEM_T& operator[](unsigned i); |
| | | |
| | | static void remove_queues_exclue(int *keys, size_t length); |
| | | private: |
| | | |
| | | |
| | | protected: |
| | | /// @brief the actual queue-> methods are forwarded into the real |
| | | /// implementation |
| | |
| | | |
| | | |
| | | template < typename ELEM_T > |
| | | void SHMQueue<ELEM_T>::remove_queues_exclue(int *keys, size_t length) |
| | | { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | std::set<int>* keyset = hashtable_keyset(hashtable); |
| | | std::set<int>::iterator keyItr; |
| | | LockFreeQueue<ELEM_T>* mqueue; |
| | | bool found; |
| | | for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { |
| | | found = false; |
| | | for(int i = 0; i < length; i++) { |
| | | if(*keyItr == keys[i]) { |
| | | found = true; |
| | | break; |
| | | } |
| | | } |
| | | if(!found) { |
| | | mqueue = (LockFreeQueue<ELEM_T> *)hashtable_get(hashtable, *keyItr); |
| | | delete mqueue; |
| | | } |
| | | } |
| | | delete keyset; |
| | | |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize): KEY(key) |
| | | { |
| | | |
| | |
| | | */ |
| | | void shm_destroy(); |
| | | |
| | | |
| | | |
| | | //移除不包含在keys中的队列 |
| | | void shm_remove_queues_exclue(void *keys, int length); |
| | | /** |
| | | * 创建队列 |
| | | * @ shmqueue |
| | |
| | | void* shmqueue_create( int * key, int queue_size); |
| | | |
| | | /** |
| | | * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并返回空值 |
| | | * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出 |
| | | */ |
| | | void* shmqueue_attach(int key); |
| | | void* shmqueue_attach(int key) ; |
| | | |
| | | /** |
| | | * 销毁 |
| | | */ |
| | | void shmqueue_drop(void * _shmqueue) ; |
| | | void shmqueue_drop(void * _shmqueue); |
| | | |
| | | /** |
| | | * 队列元素的个数 |
| | | */ |
| | | int shmqueue_size(void * _shmqueue); |
| | | int shmqueue_size(void * _shmqueue) ; |
| | | |
| | | /** |
| | | * 是否已满 |
| | | * @return 1是, 0否 |
| | | */ |
| | | int shmqueue_full(void * _shmqueue); |
| | | |
| | | /** |
| | | * 是否为空 |
| | | * @return 1是, 0否 |
| | | */ |
| | | int shmqueue_empty(void * _shmqueue) ; |
| | | |
| | | /** |
| | | * 入队, 队列满时等待 |
| | | * 入队, 队列满时等待. |
| | | * @return 1 入队成功, 0 入队失败 |
| | | */ |
| | | int shmqueue_push(void * _shmqueue, void *src_ele, int ele_size); |
| | | int shmqueue_push(void * _shmqueue, void *src, int size); |
| | | |
| | | /** |
| | | * 入队, 队列满时立即返回 |
| | | * 入队, 队列满时立即返回. |
| | | * @return 1 入队成功, 0 入队失败 |
| | | */ |
| | | int shmqueue_push_nowait(void * _shmqueue, void *src_ele, int ele_size) ; |
| | | int shmqueue_push_nowait(void * _shmqueue, void *src, int size) ; |
| | | |
| | | /** |
| | | * 入队, 指定时间内入队不成功就返回 |
| | | * timespec {sec秒, nsec纳秒} |
| | | * @sec 秒 |
| | | * @nsec 纳秒 |
| | | * @return 1 入队成功, 0 入队失败 |
| | | */ |
| | | int shmqueue_push_timeout(void * _shmqueue, void *src_ele, int ele_size, void * _timeout); |
| | | int shmqueue_push_timeout(void * _shmqueue, void *src, int size, int sec, int nsec) ; |
| | | |
| | | /** |
| | | * 出队, 队列空时等待 |
| | | * @return 1 出队成功, 0出队失败 |
| | | */ |
| | | int shmqueue_pop(void * _shmqueue, void *dest_ele); |
| | | int shmqueue_pop(void * _shmqueue, void **dest, int *size); |
| | | |
| | | /** |
| | | * 出队, 队列空时立即返回 |
| | | * @return 1 出队成功, 0出队失败 |
| | | */ |
| | | int shmqueue_pop_nowait(void * _shmqueue, void *dest_ele) ; |
| | | int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) ; |
| | | |
| | | /** |
| | | * 出队, 指定时间内出队不成功就返回 |
| | | * @sec秒 |
| | | * @nsec纳秒 |
| | | * @return 1 出队成功, 0出队失败 |
| | | */ |
| | | int shmqueue_pop_timeout(void * _shmqueue, void *dest_ele, void * _timeout); |
| | | int shmqueue_pop_timeout(void * _shmqueue, void **dest, int *size, int sec, int nsec); |
| | | |
| | | /** |
| | | * 释放出队分配的内存 |
| | | */ |
| | | void shmqueue_free(void *ptr); |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | |
| | | void SemUtil::remove(int semid) { |
| | | union semun dummy; |
| | | if (semctl(semid, 0, IPC_RMID, dummy) == -1) |
| | | err_exit(errno, "semctl"); |
| | | err_msg(errno, "SemUtil::remove"); |
| | | |
| | | } |
| | | |
| | |
| | | union semun arg; |
| | | arg.val = val; |
| | | if (semctl(semId, 0, SETVAL, arg) == -1) |
| | | err_exit(errno, "SemUtil::set semctl"); |
| | | err_msg(errno, "SemUtil::set"); |
| | | } |
| | | |
| | | |
| | |
| | | mem_pool_destroy(); |
| | | } |
| | | |
| | | //移除不包含在keys中的队列 |
| | | void shm_remove_queues_exclue(void *keys, int length) { |
| | | SHMQueue<ele_t>::remove_queues_exclue((int*)keys, (size_t)length); |
| | | } |
| | | |
| | | /** |
| | | * 创建队列 |
| | | * @ shmqueue |
| | |
| | | |
| | | /** |
| | | * 入队, 指定时间内入队不成功就返回 |
| | | * timespec {sec秒, nsec纳秒} |
| | | * @sec秒 |
| | | * @nsec纳秒 |
| | | */ |
| | | int shmqueue_push_timeout(void * _shmqueue, void *src, int size, void * _timeout) { |
| | | struct timespec *timeout = (struct timespec *)_timeout; |
| | | int shmqueue_push_timeout(void * _shmqueue, void *src, int size, int sec, int nsec) { |
| | | |
| | | struct timespec timeout = {sec, nsec}; |
| | | shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; |
| | | ele_t dest; |
| | | dest.size = size; |
| | | dest.buf = mm_malloc(size); |
| | | memcpy(dest.buf, src, size); |
| | | return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_timeout(dest, timeout); |
| | | return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_timeout(dest, &timeout); |
| | | } |
| | | |
| | | /** |
| | | * 出队, 队列空时等待 |
| | | */ |
| | | int shmqueue_pop(void * _shmqueue, void *dest) { |
| | | int shmqueue_pop(void * _shmqueue, void **dest, int *size) { |
| | | shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; |
| | | ele_t src; |
| | | |
| | | bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop(src); |
| | | if (rv) { |
| | | memcpy(dest, src.buf, src.size); |
| | | void * _dest = malloc(src.size); |
| | | memcpy(_dest, src.buf, src.size); |
| | | *dest = _dest; |
| | | *size = src.size; |
| | | mm_free(src.buf); |
| | | return 1; |
| | | } else { |
| | |
| | | /** |
| | | * 出队, 队列空时立即返回 |
| | | */ |
| | | int shmqueue_pop_nowait(void * _shmqueue, void *dest) { |
| | | int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) { |
| | | shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; |
| | | ele_t src; |
| | | |
| | | bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_nowait(src); |
| | | if (rv) { |
| | | void * _dest = malloc(src.size); |
| | | memcpy(dest, src.buf, src.size); |
| | | *dest = _dest; |
| | | *size = src.size; |
| | | mm_free(src.buf); |
| | | return 1; |
| | | } else { |
| | |
| | | |
| | | /** |
| | | * 出队, 指定时间内出队不成功就返回 |
| | | * @sec秒 |
| | | * @nsec纳秒 |
| | | */ |
| | | int shmqueue_pop_timeout(void * _shmqueue, void *dest, void * _timeout) { |
| | | struct timespec *timeout = (struct timespec *)_timeout; |
| | | int shmqueue_pop_timeout(void * _shmqueue, void **dest, int *size, int sec, int nsec) { |
| | | struct timespec timeout = {sec, nsec}; |
| | | shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue; |
| | | ele_t src; |
| | | bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_timeout(src, timeout); |
| | | |
| | | bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_timeout(src, &timeout); |
| | | if (rv) { |
| | | memcpy(dest, src.buf, src.size); |
| | | void * _dest = malloc(src.size); |
| | | memcpy(_dest, src.buf, src.size); |
| | | *dest = _dest; |
| | | *size = src.size; |
| | | mm_free(src.buf); |
| | | return 1; |
| | | } else { |
| | |
| | | } |
| | | } |
| | | |
| | | void shmqueue_free(void *ptr) { |
| | | free(ptr); |
| | | } |
| | | |
| | | |
| | |
| | | include $(ROOT)/Make.defines.$(PLATFORM) |
| | | |
| | | |
| | | PROGS = test_queue_wrapper server client sem_test |
| | | PROGS = test_queue_wrapper server client |
| | | |
| | | |
| | | build: $(PROGS) |
| | |
| | | msg_t msg; |
| | | msg.key = key; |
| | | |
| | | msg_t rec_msg; |
| | | void * rec_msg; |
| | | int rec_msg_size; |
| | | //入队 |
| | | while(true) { |
| | | printf("=====>say some thing:\n"); |
| | | scanf("%s", msg.buf); |
| | | shmqueue_push(remote_queue, (void *)&msg, sizeof(msg)); |
| | | //printf("send: %s\n", msg.buf); |
| | | shmqueue_pop(local_queue, (void *)&rec_msg ); |
| | | printf("=====>peer : %s\n", rec_msg.buf); |
| | | shmqueue_pop(local_queue, &rec_msg, &rec_msg_size); |
| | | printf("=====>peer : %s\n", ((msg_t*)rec_msg)->buf); |
| | | free(rec_msg); |
| | | |
| | | |
| | | } |
| | |
| | | } msg_t; |
| | | |
| | | void server() { |
| | | msg_t msg; |
| | | void * msg; |
| | | int msg_size; |
| | | msg_t send_msg; |
| | | int key = 1; |
| | | size_t qsize = 16; |
| | |
| | | |
| | | struct timespec timeout = {1, 0}; |
| | | |
| | | while(shmqueue_pop(local_queue, (void *)&msg) ) { |
| | | void * remote_queue = shmqueue_attach(msg.key); |
| | | printf("received: %s\n", msg.buf); |
| | | while(shmqueue_pop(local_queue, &msg, &msg_size) ) { |
| | | void * remote_queue = shmqueue_attach(((msg_t *)msg)->key); |
| | | printf("received: %s\n", ((msg_t *)msg)->buf); |
| | | // send_msg.key = 1; |
| | | sprintf(send_msg.buf, "hello, I have received: %s!", msg.buf); |
| | | sprintf(send_msg.buf, "hello, I have received: %s!", ((msg_t *)msg)->buf); |
| | | shmqueue_push(remote_queue, (void *)&send_msg, sizeof(send_msg)); |
| | | shmqueue_drop(remote_queue); |
| | | // cout << item.pic << endl; |
| | |
| | | #include "shm_queue_wrapper.h" |
| | | #include "mm.h" |
| | | |
| | | typedef struct message_t |
| | | { |
| | | char method[20]; |
| | | int code; |
| | | // typedef struct message_t |
| | | // { |
| | | // char method[20]; |
| | | // int code; |
| | | |
| | | } message_t; |
| | | // } message_t; |
| | | |
| | | void test1() { |
| | | unsigned int i = 0; |
| | | int key = -1; |
| | | int key = 1; |
| | | |
| | | size_t qsize = 16; |
| | | void * queue = shmqueue_create( &key, qsize); |
| | | message_t item; |
| | | //message_t item; |
| | | char msg[100]; |
| | | void *rtmsg; |
| | | int size; |
| | | |
| | | for(i = 0; i < qsize; i++) { |
| | | sprintf(item.method, "hello"); |
| | | item.code = i ; |
| | | sprintf(msg, "%d hello", i); |
| | | //入队 |
| | | if(shmqueue_push(queue, (void *)&item, sizeof(message_t))) { |
| | | printf("push:%d %s\n", item.code, item.method ); |
| | | if(shmqueue_push(queue, (void *)msg, sizeof(msg))) { |
| | | printf("push: %s\n", msg ); |
| | | } |
| | | } |
| | | printf("%d\n", key); |
| | | struct timespec timeout = {1, 0}; |
| | | |
| | | printf("key == %d\n", key); |
| | | // struct timespec timeout = {1, 0}; |
| | | // int keys[] = {1,2}; |
| | | // shm_remove_queues_exclue((void *)keys, 1); |
| | | i = 0; |
| | | // 出队 |
| | | while((shmqueue_pop_timeout(queue, (void *)&item, &timeout)) ) { |
| | | printf("pop:%d %s\n", item.code, item.method ); |
| | | while((shmqueue_pop_timeout(queue, &rtmsg, &size, 1, 0)) ) { |
| | | printf("pop: %s\n", (char *)rtmsg ); |
| | | free(rtmsg); |
| | | // cout << item.pic << endl; |
| | | i++; |
| | | } |
| | |
| | | |
| | | |
| | | int main () { |
| | | mm_init(512); |
| | | shm_init(512); |
| | | test1(); |
| | | |
| | | //整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。 |
| | | mm_destroy(); |
| | | shm_destroy(); |
| | | return 0; |
| | | } |