wangzhengquan
2020-07-14 8b4ddf10e71e1c8fabd33c72b282f7da65ff682f
commit
24个文件已修改
378 ■■■■ 已修改文件
README.md 119 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
include/usgcommon/usg_common.h 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/hashtable.c 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/hashtable.h 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/lock_free_queue.h 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/shm_queue.h 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/shm_queue_wrapper.h 48 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
queue/sem_util.c 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/shm_queue_wrapper.c 47 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/multiple_queue_consumer 补丁 | 查看 | 原始文档 | blame | 历史
test/multiple_queue_productor 补丁 | 查看 | 原始文档 | blame | 历史
test/single_consumer 补丁 | 查看 | 原始文档 | blame | 历史
test/single_productor 补丁 | 查看 | 原始文档 | blame | 历史
test/test_lostdata 补丁 | 查看 | 原始文档 | blame | 历史
test/test_queue 补丁 | 查看 | 原始文档 | blame | 历史
test/test_timeout 补丁 | 查看 | 原始文档 | blame | 历史
test2/Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/client 补丁 | 查看 | 原始文档 | blame | 历史
test2/client.c 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/server 补丁 | 查看 | 原始文档 | blame | 历史
test2/server.c 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/test_queue_wrapper 补丁 | 查看 | 原始文档 | blame | 历史
test2/test_queue_wrapper.c 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
README.md
@@ -1,121 +1,8 @@
<<<<<<< HEAD
## 实例
```
#include "shm_queue_wrapper.h"
#include "mm.h"
typedef struct message_t
{
    char method[20];
    int code;
} message_t;
void test1() {
    unsigned int i = 0;
    int key = 2;
     //
    size_t qsize = 16;
    void * queue = shmqueue_init( key, qsize, sizeof(message_t));
    message_t item;
    // LockFreeQueue<struct Item> queue(16);
    for(i = 0; i < qsize; i++) {
        sprintf(item.method, "hello");
        item.code = i ;
        if(shmqueue_push(queue, (void *)&item)) {
              printf("push:%d %s\n", item.code, item.method );
        }
    }
    struct timespec timeout = {1, 0};
    i = 0;
    while((shmqueue_pop_timeout(queue, (void *)&item, &timeout)) ) {
        printf("pop:%d %s\n", item.code, item.method );
       // cout <<  item.pic << endl;
        i++;
    }
    //销毁队列
    shmqueue_destroy(queue);
}
 
int main () {
    test1();
    //整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。
    mm_destroy();
    return 0;
}
```
## 实例
 请求应答 `./test2/server.c ./test2/client.c`
## 接口说明
```
/**
 * 初始化
 * @ shmqueue
 * @ key 标识共享队列的唯一key
 * @ queue_size 队列大小 , 这个值必须是2的指数即 1, 2, 4, 8, 16 等
 * @ ele_size 队列中元素大小, 这个值不能超过512,当然如果需要可以调整这个最大限制
 */
void* shmqueue_init(int key, int queue_size, int ele_size);
/**
 * 销毁
*/
void shmqueue_destroy(void *shmqueue);
/**
 * 队列元素的个数
 */
uint32_t shmqueue_size(void *shmqueue);
/**
 * 是否已满
 */
int shmqueue_full(void *shmqueue);
/**
 * 是否为空
 */
int shmqueue_empty(void *shmqueue);
/**
 * 入队, 队列满时等待
 */
int shmqueue_push(void *shmqueue, void *src_ele);
/**
 * 入队, 队列满时立即返回
 */
int shmqueue_push_nowait(void *shmqueue, void *src_ele);
/**
 * 入队, 指定时间内入队不成功就返回
 */
int shmqueue_push_timeout(void *shmqueue, void *src_ele, struct timespec * timeout);
/**
 * 出队, 队列空时等待
 */
int shmqueue_pop(void *shmqueue, void *dest_ele);
/**
 * 出队, 队列空时立即返回
 */
int shmqueue_pop_nowait(void *shmqueue, void *dest_ele);
/**
 * 出队, 指定时间内出队不成功就返回
 */
int shmqueue_pop_timeout(void *shmqueue, void *dest_ele, struct timespec * timeout);
```
=======
## softbus
shm的通讯库
>>>>>>> dd08a8134dea74ac30213c1b8580bff34ee7095b
include/usgcommon/usg_common.h
@@ -64,9 +64,10 @@
#include <algorithm>
#include <iomanip>
#include <limits>
#include <map>
#include <initializer_list>
#include <vector>
#include <map>
#include <set>
#include <thread>
 
#endif
queue/hashtable.c
@@ -2,6 +2,7 @@
#include "hashtable.h"
#include "mm.h"
#include "sem_util.h"
#include <set>
typedef struct tailq_entry_t
{
@@ -118,6 +119,10 @@
}
void hashtable_removeall(hashtable_t *hashtable)
{
  tailq_entry_t *item;
@@ -224,3 +229,37 @@
  //释放读写锁
  SemUtil::inc(hashtable->wlock);
}
void hashtable_foreach(hashtable_t *hashtable, hashtable_foreach_cb cb) {
  tailq_entry_t *item;
  for (int i = 0; i < MAPSIZE; i++) {
    tailq_header_t *my_tailq_head = hashtable->array[i] ;
    if (my_tailq_head == NULL )
      continue;
    TAILQ_FOREACH(item, my_tailq_head, joint)
    {
      cb(item->key,  item -> value);
    }
  }
}
std::set<int> * hashtable_keyset(hashtable_t *hashtable) {
  std::set<int> *keyset = new std::set<int>;
  tailq_entry_t *item;
  for (int i = 0; i < MAPSIZE; i++) {
    tailq_header_t *my_tailq_head = hashtable->array[i] ;
    if (my_tailq_head == NULL )
      continue;
    TAILQ_FOREACH(item, my_tailq_head, joint)
    {
      keyset->insert(item->key);
    }
  }
  return keyset;
}
queue/hashtable.h
@@ -2,7 +2,7 @@
#define __HASHTABLE_H__
#include <sys/queue.h>
//#include "queue.h"
#include <set>
#define MAPSIZE 100
@@ -15,6 +15,7 @@
 size_t readcnt;
} hashtable_t;
typedef void (*hashtable_foreach_cb)(int key, void *value);
void hashtable_init(hashtable_t *hashtable);
void *hashtable_get(hashtable_t *hashtable, int key);
@@ -23,7 +24,11 @@
void hashtable_removeall(hashtable_t *hashtable);
void hashtable_foreach(hashtable_t *hashtable, hashtable_foreach_cb cb);
void hashtable_printall(hashtable_t *hashtable);
int hashtable_alloc_key(hashtable_t *hashtable);
std::set<int> * hashtable_keyset(hashtable_t *hashtable) ;
#endif
queue/include/lock_free_queue.h
@@ -3,7 +3,7 @@
#include <usg_common.h>
#include <assert.h> // assert()
#include "mm.h"
#include "mem_pool.h"
#include "sem_util.h"
#include "logger_factory.h"
@@ -253,7 +253,8 @@
bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data)
{
    if (SemUtil::dec(items) == -1) {
        err_exit(errno, "remove");
        err_msg(errno, "LockFreeQueue pop");
        return false;
    }
    if (m_qImpl.pop(a_data)) {
@@ -272,8 +273,10 @@
    if (SemUtil::dec_nowait(items) == -1) {
        if (errno == EAGAIN)
            return false;
        else
            err_exit(errno, "remove_nowait");
        else {
            err_msg(errno, "LockFreeQueue pop_nowait");
            return false;
        }
    }
    if (m_qImpl.pop(a_data)) {
@@ -293,8 +296,10 @@
    if (SemUtil::dec_timeout(items, timeout) == -1) {
        if (errno == EAGAIN)
            return false;
        else
            err_exit(errno, "remove_timeout");
        else {
            err_msg(errno, "LockFreeQueue pop_timeout");
            return false;
        }
    }
    if (m_qImpl.pop(a_data)) {
@@ -316,14 +321,14 @@
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
void * LockFreeQueue<ELEM_T, Q_TYPE>::operator new(size_t size){
        return mm_malloc(size);
        return mem_pool_malloc(size);
}
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
void LockFreeQueue<ELEM_T, Q_TYPE>::operator delete(void *p) {
    return mm_free(p);
    return mem_pool_free(p);
}
// include implementation files
queue/include/shm_queue.h
@@ -39,6 +39,10 @@
    inline ELEM_T& operator[](unsigned i);
    static void remove_queues_exclue(int *keys, size_t length);
private:
protected:
    /// @brief the actual queue-> methods are forwarded into the real 
    ///        implementation
@@ -51,6 +55,31 @@
template < typename ELEM_T >
void SHMQueue<ELEM_T>::remove_queues_exclue(int *keys, size_t length)
{
    hashtable_t *hashtable = mm_get_hashtable();
    std::set<int>* keyset = hashtable_keyset(hashtable);
    std::set<int>::iterator keyItr;
     LockFreeQueue<ELEM_T>* mqueue;
    bool found;
    for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
        found = false;
        for(int i = 0; i < length; i++) {
            if(*keyItr == keys[i]) {
                found = true;
                break;
            }
        }
        if(!found) {
           mqueue = (LockFreeQueue<ELEM_T> *)hashtable_get(hashtable, *keyItr);
           delete mqueue;
        }
    }
    delete keyset;
}
template < typename ELEM_T >
SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize): KEY(key)
{
queue/include/shm_queue_wrapper.h
@@ -22,6 +22,10 @@
 */
void shm_destroy();
//移除不包含在keys中的队列
void shm_remove_queues_exclue(void *keys, int length);
/**
 * 创建队列
 * @ shmqueue 
@@ -31,58 +35,76 @@
void* shmqueue_create( int * key, int queue_size);
/**
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并返回空值
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出
 */
void* shmqueue_attach(int key);
void* shmqueue_attach(int key) ;
/**
 * 销毁
*/
void shmqueue_drop(void * _shmqueue) ;
void shmqueue_drop(void * _shmqueue);
/**
 * 队列元素的个数
 */
int shmqueue_size(void * _shmqueue);
int shmqueue_size(void * _shmqueue) ;
/**
 * 是否已满
 * @return 1是, 0否
 */
int shmqueue_full(void * _shmqueue);
/**
 * 是否为空
 * @return 1是, 0否
 */
int shmqueue_empty(void * _shmqueue) ;
/**
 * 入队, 队列满时等待
 * 入队, 队列满时等待.
 * @return 1 入队成功, 0 入队失败
 */
int shmqueue_push(void * _shmqueue, void *src_ele, int ele_size);
int shmqueue_push(void * _shmqueue, void *src, int size);
/**
 * 入队, 队列满时立即返回
 * 入队, 队列满时立即返回.
 * @return 1 入队成功, 0 入队失败
 */
int shmqueue_push_nowait(void * _shmqueue, void *src_ele, int ele_size) ;
int shmqueue_push_nowait(void * _shmqueue, void *src, int size) ;
/**
 * 入队, 指定时间内入队不成功就返回
 * timespec {sec秒, nsec纳秒}
 * @sec 秒
 * @nsec 纳秒
 * @return 1 入队成功, 0 入队失败
 */
int shmqueue_push_timeout(void * _shmqueue, void *src_ele, int ele_size, void * _timeout);
int shmqueue_push_timeout(void * _shmqueue, void *src, int size,  int sec, int nsec) ;
/**
 * 出队, 队列空时等待
 * @return 1 出队成功, 0出队失败
 */
int shmqueue_pop(void * _shmqueue, void *dest_ele);
int shmqueue_pop(void * _shmqueue, void **dest, int *size);
/**
 * 出队, 队列空时立即返回
 * @return 1 出队成功, 0出队失败
 */
int shmqueue_pop_nowait(void * _shmqueue, void *dest_ele) ;
int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) ;
/**
 * 出队, 指定时间内出队不成功就返回
 * @sec秒
 * @nsec纳秒
 * @return 1 出队成功, 0出队失败
 */
int shmqueue_pop_timeout(void * _shmqueue, void *dest_ele, void * _timeout);
int shmqueue_pop_timeout(void * _shmqueue, void **dest, int *size, int sec, int nsec);
/**
 * 释放出队分配的内存
 */
void shmqueue_free(void *ptr);
#ifdef __cplusplus
}
queue/libshm_queue.a
Binary files differ
queue/sem_util.c
@@ -132,7 +132,7 @@
void SemUtil::remove(int semid) {
    union semun dummy;
    if (semctl(semid, 0, IPC_RMID, dummy) == -1)
        err_exit(errno, "semctl");
        err_msg(errno, "SemUtil::remove");
}
@@ -142,7 +142,7 @@
    union semun arg;
    arg.val = val;
    if (semctl(semId, 0, SETVAL, arg) == -1)
        err_exit(errno, "SemUtil::set semctl");
        err_msg(errno, "SemUtil::set");
}
queue/shm_queue_wrapper.c
@@ -21,6 +21,11 @@
    mem_pool_destroy();
}
//移除不包含在keys中的队列
void shm_remove_queues_exclue(void *keys, int length) {
    SHMQueue<ele_t>::remove_queues_exclue((int*)keys, (size_t)length);
}
/**
 * 创建队列
 * @ shmqueue 
@@ -122,27 +127,33 @@
/**
 * 入队, 指定时间内入队不成功就返回
 * timespec {sec秒, nsec纳秒}
 * @sec秒
 * @nsec纳秒
 */
int shmqueue_push_timeout(void * _shmqueue, void *src, int size,  void * _timeout) {
    struct timespec *timeout = (struct timespec *)_timeout;
int shmqueue_push_timeout(void * _shmqueue, void *src, int size,  int sec, int nsec) {
    struct timespec timeout = {sec, nsec};
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t dest;
    dest.size = size;
    dest.buf = mm_malloc(size);
    memcpy(dest.buf, src, size);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_timeout(dest, timeout);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_timeout(dest, &timeout);
}
/**
 * 出队, 队列空时等待
 */
int shmqueue_pop(void * _shmqueue, void *dest) {
int shmqueue_pop(void * _shmqueue, void **dest, int *size) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t src;
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop(src);
    if (rv) {
        memcpy(dest, src.buf, src.size);
        void * _dest = malloc(src.size);
        memcpy(_dest, src.buf, src.size);
        *dest = _dest;
        *size = src.size;
        mm_free(src.buf);
        return 1;
    } else {
@@ -154,12 +165,16 @@
/**
 * 出队, 队列空时立即返回
 */
int shmqueue_pop_nowait(void * _shmqueue, void *dest) {
int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t src;
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_nowait(src);
    if (rv) {
        void * _dest = malloc(src.size);
        memcpy(dest, src.buf, src.size);
        *dest = _dest;
        *size = src.size;
        mm_free(src.buf);
        return 1;
    } else {
@@ -169,14 +184,20 @@
/**
 * 出队, 指定时间内出队不成功就返回
 * @sec秒
 * @nsec纳秒
 */
int shmqueue_pop_timeout(void * _shmqueue, void *dest, void * _timeout) {
    struct timespec *timeout = (struct timespec *)_timeout;
int shmqueue_pop_timeout(void * _shmqueue, void **dest, int *size, int sec, int nsec) {
    struct timespec timeout = {sec, nsec};
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t src;
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_timeout(src, timeout);
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_timeout(src, &timeout);
    if (rv) {
        memcpy(dest, src.buf, src.size);
        void * _dest = malloc(src.size);
        memcpy(_dest, src.buf, src.size);
        *dest = _dest;
        *size = src.size;
        mm_free(src.buf);
        return 1;
    } else {
@@ -184,4 +205,8 @@
    }
}
void shmqueue_free(void *ptr) {
    free(ptr);
}
test/multiple_queue_consumer
Binary files differ
test/multiple_queue_productor
Binary files differ
test/single_consumer
Binary files differ
test/single_productor
Binary files differ
test/test_lostdata
Binary files differ
test/test_queue
Binary files differ
test/test_timeout
Binary files differ
test2/Makefile
@@ -14,7 +14,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
PROGS =    test_queue_wrapper server client sem_test
PROGS =    test_queue_wrapper server client
build: $(PROGS)
test2/client
Binary files differ
test2/client.c
@@ -17,15 +17,17 @@
    msg_t msg;
    msg.key = key;
    msg_t rec_msg;
    void * rec_msg;
    int rec_msg_size;
    //入队
    while(true) {
      printf("=====>say some thing:\n");
      scanf("%s", msg.buf);
      shmqueue_push(remote_queue, (void *)&msg, sizeof(msg));
      //printf("send: %s\n",  msg.buf);
      shmqueue_pop(local_queue, (void *)&rec_msg );
      printf("=====>peer : %s\n",  rec_msg.buf);
      shmqueue_pop(local_queue, &rec_msg, &rec_msg_size);
      printf("=====>peer : %s\n",  ((msg_t*)rec_msg)->buf);
      free(rec_msg);
      
    }
test2/server
Binary files differ
test2/server.c
@@ -9,7 +9,8 @@
} msg_t;
void server() {
     msg_t msg;
     void * msg;
     int msg_size;
     msg_t send_msg;
     int key = 1;
    size_t qsize = 16;
@@ -17,11 +18,11 @@
    struct timespec timeout = {1, 0};
    
    while(shmqueue_pop(local_queue, (void *)&msg) ) {
        void * remote_queue = shmqueue_attach(msg.key);
        printf("received: %s\n", msg.buf);
    while(shmqueue_pop(local_queue, &msg, &msg_size) ) {
        void * remote_queue = shmqueue_attach(((msg_t *)msg)->key);
        printf("received: %s\n", ((msg_t *)msg)->buf);
        // send_msg.key = 1;
        sprintf(send_msg.buf, "hello, I have received: %s!", msg.buf);
        sprintf(send_msg.buf, "hello, I have received: %s!", ((msg_t *)msg)->buf);
        shmqueue_push(remote_queue, (void *)&send_msg, sizeof(send_msg));
        shmqueue_drop(remote_queue);
       // cout <<  item.pic << endl;
test2/test_queue_wrapper
Binary files differ
test2/test_queue_wrapper.c
@@ -1,36 +1,40 @@
#include "shm_queue_wrapper.h"
#include "mm.h"
typedef struct message_t
{
    char method[20];
    int code;
// typedef struct message_t
// {
//     char method[20];
//     int code;
    
} message_t;
// } message_t;
void test1() {
    unsigned int i = 0;
    int key = -1;
    int key = 1;
 
    size_t qsize = 16;
    void * queue = shmqueue_create( &key, qsize);
    message_t item;
    //message_t item;
    char msg[100];
    void *rtmsg;
    int size;
    
    for(i = 0; i < qsize; i++) {
        sprintf(item.method, "hello");
        item.code = i ;
        sprintf(msg, "%d hello", i);
        //入队
        if(shmqueue_push(queue, (void *)&item, sizeof(message_t))) {
              printf("push:%d %s\n", item.code, item.method );
        if(shmqueue_push(queue, (void *)msg, sizeof(msg))) {
              printf("push: %s\n", msg );
        }
    }
    printf("%d\n", key);
    struct timespec timeout = {1, 0};
    printf("key == %d\n", key);
    // struct timespec timeout = {1, 0};
    // int keys[] = {1,2};
    // shm_remove_queues_exclue((void *)keys, 1);
    i = 0;
    // 出队
    while((shmqueue_pop_timeout(queue, (void *)&item, &timeout)) ) {
        printf("pop:%d %s\n", item.code, item.method );
    while((shmqueue_pop_timeout(queue, &rtmsg, &size, 1, 0)) ) {
        printf("pop: %s\n", (char *)rtmsg );
        free(rtmsg);
       // cout <<  item.pic << endl;
        i++;
    }
@@ -41,10 +45,10 @@
 
int main () {
    mm_init(512);
    shm_init(512);
    test1();
    //整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。
    mm_destroy();
    shm_destroy();
    return 0;
}