wangzhengquan
2020-07-09 91f003aac4c95f4d2a2fc0782c9bea9d484b6919
update
6个文件已添加
14 文件已重命名
12个文件已修改
489 ■■■■■ 已修改文件
.gitignore 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
README.md 131 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/Makefile 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/hashtable.c 补丁 | 查看 | 原始文档 | blame | 历史
queue/hashtable.h 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/array_lock_free_queue.h 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/linked_lock_free_queue.h 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/lock_free_queue.h 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/queue_factory.h 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/shm_allocator.h 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/shm_queue.h 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/shm_queue_wrapper.h 146 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
queue/logger_factory.h 补丁 | 查看 | 原始文档 | blame | 历史
queue/mm.c 补丁 | 查看 | 原始文档 | blame | 历史
queue/mm.h 补丁 | 查看 | 原始文档 | blame | 历史
queue/sem_util.c 补丁 | 查看 | 原始文档 | blame | 历史
queue/sem_util.h 补丁 | 查看 | 原始文档 | blame | 历史
test/Makefile 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/multiple_queue_consumer 补丁 | 查看 | 原始文档 | blame | 历史
test/multiple_queue_productor 补丁 | 查看 | 原始文档 | blame | 历史
test/single_consumer 补丁 | 查看 | 原始文档 | blame | 历史
test/single_productor 补丁 | 查看 | 原始文档 | blame | 历史
test/test 补丁 | 查看 | 原始文档 | blame | 历史
test/test.c 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_queue 补丁 | 查看 | 原始文档 | blame | 历史
test/test_queue.c 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_timeout 补丁 | 查看 | 原始文档 | blame | 历史
test2/Makefile 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/test_queue_wrapper 补丁 | 查看 | 原始文档 | blame | 历史
test2/test_queue_wrapper.c 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -12,7 +12,6 @@
*.pch
# Compiled Dynamic libraries
*.so
*.dylib
*.dll
@@ -23,8 +22,6 @@
# Compiled Static libraries
*.lai
*.la
*.a
*.lib
# Executables
*.exe
Makefile
@@ -1,4 +1,4 @@
DIRS = squeue test
DIRS = queue  test
all:
    for i in $(DIRS); do \
README.md
@@ -1,4 +1,131 @@
## cppShmqueue
## 实例
Cpp版本的共享内存队列库
```
#include "shm_queue_wrapper.h"
#include "mm.h"
typedef struct message_t
{
    char method[20];
    int code;
} message_t;
void test1() {
    unsigned int i = 0;
    int key = 2;
    size_t qsize = 16;
      shmqueue_t queue;
    shmqueue_init(&queue, key, qsize, sizeof(message_t));
    message_t item;
    // LockFreeQueue<struct Item> queue(16);
    for(i = 0; i < qsize; i++) {
        sprintf(item.method, "hello");
        item.code = i ;
        if(shmqueue_push(&queue, (void *)&item)) {
              printf("push:%d %s\n", item.code, item.method );
        }
    }
    struct timespec timeout = {1, 0};
    i = 0;
    while((shmqueue_pop_timeout(&queue, (void *)&item, &timeout)) ) {
        printf("pop:%d %s\n", item.code, item.method );
       // cout <<  item.pic << endl;
        i++;
    }
    shmqueue_destroy(&queue);
}
int main () {
    test1();
    //整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用销毁整块内存。
    mm_destroy();
    return 0;
}
```
## 接口说明
```
/**
 * 初始化
 * @ shmqueue
 * @ key 标识共享队列的唯一key
 * @ queue_size 队列大小
 * @ ele_size 队列中元素大小
 */
inline void shmqueue_init(shmqueue_t *shmqueue, int key, int queue_size, int ele_size) {
    if(ele_size > MAX_ELE_SIZE) {
        err_exit(0, "shmqueue_init 元素大小超过设计的最大大小");
    }
    SHMQueue<ele_t> *queue = new SHMQueue<ele_t>(key, queue_size);
    shmqueue->mqueue = (void *)queue;
    shmqueue->ele_size = ele_size;
}
/**
 * 销毁
*/
inline void shmqueue_destroy(shmqueue_t *shmqueue) {
    delete (SHMQueue<ele_t> *)shmqueue->mqueue;
}
/**
 * 队列元素的个数
 */
inline uint32_t shmqueue_size(shmqueue_t *shmqueue) {
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->size();
}
/**
 * 是否已满
 */
inline int shmqueue_full(shmqueue_t *shmqueue) {
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->full();
}
/**
 * 是否为空
 */
inline int shmqueue_empty(shmqueue_t *shmqueue) {
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->empty();
}
/**
 * 入队, 队列满时等待
 */
inline int shmqueue_push(shmqueue_t *shmqueue, void *src_ele)
/**
 * 入队, 队列满时立即返回
 */
inline int shmqueue_push_nowait(shmqueue_t *shmqueue, void *src_ele)
/**
 * 入队, 指定时间内入对不成功就返回
 */
inline int shmqueue_push_timeout(shmqueue_t *shmqueue, void *src_ele, struct timespec * timeout)
/**
 * 出队, 队列空时等待
 */
inline int shmqueue_pop(shmqueue_t *shmqueue, void *dest_ele)
/**
 * 出队, 队列空时立即返回
 */
inline int shmqueue_pop_nowait(shmqueue_t *shmqueue, void *dest_ele)
/**
 * 出队, 指定时间内出对不成功就返回
 */
inline int shmqueue_pop_timeout(shmqueue_t *shmqueue, void *dest_ele, struct timespec * timeout)
```
queue/Makefile
File was renamed from squeue/Makefile
@@ -14,8 +14,8 @@
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
LIBSQUEUE = libsqueue.a
DLIBSQUEUE = libsqueue.so
LIBSQUEUE = libshm_queue.a
DLIBSQUEUE = libshm_queue.so
SOURCES := $(wildcard *.c)
OBJS   = $(patsubst %.c, %.o, $(SOURCES)) 
queue/hashtable.c
queue/hashtable.h
queue/include/array_lock_free_queue.h
File was renamed from squeue/include/array_lock_free_queue.h
@@ -222,7 +222,9 @@
                currentWriteIndex, (currentWriteIndex + 1)));
    
    // Just made sure this index is reserved for this thread.
    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
   // m_theQueue[countToIndex(currentWriteIndex)] = a_data;
    // printf("===sizeof(ELEM_T) = %d\n", sizeof(ELEM_T));
    memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) );
    
    // update the maximum read index after saving the piece of data. It can't
    // fail if there is only one thread inserting in the queue. It might fail 
@@ -240,7 +242,7 @@
        // software threads than hardware processors and you have more
        // than 1 producer thread
        // have a look at sched_yield (POSIX.1b)
        //sched_yield();
        sched_yield();
    }
    // The value was successfully inserted into the queue
@@ -278,8 +280,8 @@
    #endif
        // retrieve the data from the queue
        a_data = m_theQueue[countToIndex(currentReadIndex)];
        //a_data = m_theQueue[countToIndex(currentReadIndex)];
        memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) );
        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we 
        // increased it
queue/include/linked_lock_free_queue.h
queue/include/lock_free_queue.h
queue/include/queue_factory.h
queue/include/shm_allocator.h
queue/include/shm_queue.h
File was renamed from squeue/include/shm_queue.h
@@ -10,9 +10,6 @@
// default Queue size
// #define LOCK_FREE_Q_DEFAULT_SIZE 16
//
template < typename ELEM_T>
class SHMQueue
{
@@ -37,35 +34,18 @@
    /// @brief constructor of the class
    SHMQueue(int key=0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    
    /// @brief destructor of the class.
    /// Note it is not virtual since it is not expected to inherit from this
    /// template
    ~SHMQueue();
    /// @brief returns the current number of items in the queue
    /// It tries to take a snapshot of the size of the queue, but in busy environments
    /// this function might return bogus values.
    ///
    /// If a reliable queue size must be kept you might want to have a look at
    /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
    /// it enables a reliable size though it hits overall performance of the queue
    /// (when the reliable size variable is on it's got an impact of about 20% in time)
    inline uint32_t size();
    
    /// @brief return true if the queue is full. False otherwise
    /// It tries to take a snapshot of the size of the queue, but in busy
    /// environments this function might return bogus values. See help in method
    /// SHMQueue::size
    inline bool full();
    inline bool empty();
   
    inline bool push(const ELEM_T &a_data);
    inline bool push_nowait(const ELEM_T &a_data);
    inline bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
    inline bool pop(ELEM_T &a_data);
    inline bool pop_nowait(ELEM_T &a_data);
    inline bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
queue/include/shm_queue_wrapper.h
New file
@@ -0,0 +1,146 @@
#ifndef __SHM_QUEUE_WRAPPER_H__
#define __SHM_QUEUE_WRAPPER_H__
#include "usg_common.h"
#include "usg_typedef.h"
#include "shm_queue.h"
#include "shm_allocator.h"
#ifdef __cplusplus
extern "C" {
#endif
#define MAX_ELE_SIZE 512
typedef struct ele_t {
    char buf[MAX_ELE_SIZE];
} ele_t;
typedef struct shmqueue_t {
    size_t ele_size;
    void *mqueue;
} shmqueue_t;
/**
 * 初始化
 * @ shmqueue
 * @ key 标识共享队列的唯一key
 * @ queue_size 队列大小
 * @ ele_size 队列中元素大小
 */
inline void shmqueue_init(shmqueue_t *shmqueue, int key, int queue_size, int ele_size) {
    if(ele_size > MAX_ELE_SIZE) {
        err_exit(0, "shmqueue_init 元素大小超过设计的最大大小");
    }
    SHMQueue<ele_t> *queue = new SHMQueue<ele_t>(key, queue_size);
    shmqueue->mqueue = (void *)queue;
    shmqueue->ele_size = ele_size;
}
/**
 * 销毁
*/
inline void shmqueue_destroy(shmqueue_t *shmqueue) {
    delete (SHMQueue<ele_t> *)shmqueue->mqueue;
}
/**
 * 队列元素的个数
 */
inline uint32_t shmqueue_size(shmqueue_t *shmqueue) {
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->size();
}
/**
 * 是否已满
 */
inline int shmqueue_full(shmqueue_t *shmqueue) {
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->full();
}
/**
 * 是否为空
 */
inline int shmqueue_empty(shmqueue_t *shmqueue) {
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->empty();
}
/**
 * 入队, 队列满时等待
 */
inline int shmqueue_push(shmqueue_t *shmqueue, void *src_ele) {
    ele_t dest_ele;
    memcpy(&dest_ele.buf, src_ele, shmqueue->ele_size);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push(dest_ele);
}
/**
 * 入队, 队列满时立即返回
 */
inline int shmqueue_push_nowait(shmqueue_t *shmqueue, void *src_ele) {
    ele_t dest_ele;
    memcpy(&dest_ele.buf, src_ele, shmqueue->ele_size);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_nowait(dest_ele);
}
/**
 * 入队, 指定时间内入对不成功就返回
 */
inline int shmqueue_push_timeout(shmqueue_t *shmqueue, void *src_ele, struct timespec * timeout) {
    ele_t dest_ele;
    memcpy(&dest_ele.buf, src_ele, shmqueue->ele_size);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_timeout(dest_ele, timeout);
}
/**
 * 出队, 队列空时等待
 */
inline int shmqueue_pop(shmqueue_t *shmqueue, void *dest_ele) {
    ele_t src_ele;
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop(src_ele);
    if (rv) {
        memcpy(dest_ele, &src_ele.buf, shmqueue->ele_size);
        return 1;
    } else {
        return 0;
    }
}
/**
 * 出队, 队列空时立即返回
 */
inline int shmqueue_pop_nowait(shmqueue_t *shmqueue, void *dest_ele) {
    ele_t src_ele;
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_nowait(src_ele);
    if (rv) {
        memcpy(dest_ele, &src_ele.buf, shmqueue->ele_size);
        return 1;
    } else {
        return 0;
    }
}
/**
 * 出队, 指定时间内出对不成功就返回
 */
inline int shmqueue_pop_timeout(shmqueue_t *shmqueue, void *dest_ele, struct timespec * timeout) {
    ele_t src_ele;
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_timeout(src_ele, timeout);
    if (rv) {
        memcpy(dest_ele, &src_ele.buf, shmqueue->ele_size);
        return 1;
    } else {
        return 0;
    }
}
#ifdef __cplusplus
}
#endif
#endif
queue/libshm_queue.a
Binary files differ
queue/logger_factory.h
queue/mm.c
queue/mm.h
queue/sem_util.c
queue/sem_util.h
test/Makefile
@@ -4,17 +4,17 @@
ROOT=..
#LDLIBS+=-Wl,-rpath=$(ROOT)/common:$(ROOT)/lib/jsoncpp
# 开源工具包路径
LDDIR += -L$(ROOT)/squeue
LDDIR += -L$(ROOT)/queue
# 开源工具包
LDLIBS += -lsqueue -lpthread
LDLIBS += -lshm_queue -lpthread
INCLUDE += -I$(ROOT)/squeue/ -I$(ROOT)/squeue/include
INCLUDE += -I$(ROOT)/queue/ -I$(ROOT)/queue/include
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
 
PROGS =    test_queue single_productor single_consumer multiple_queue_productor multiple_queue_consumer test_timeout
PROGS =    test_queue single_productor single_consumer multiple_queue_productor multiple_queue_consumer test_timeout test_queue_wrapper
build: $(PROGS)
@@ -24,16 +24,16 @@
# 如果包A 引用包B, B 要放在 A 后面
 
test_queue: test.h  $(ROOT)/squeue/include/lock_free_queue.h
test_queue: test.h  $(ROOT)/queue/include/lock_free_queue.h
single_productor: test.h  $(ROOT)/squeue/include/lock_free_queue.h
single_productor: test.h  $(ROOT)/queue/include/lock_free_queue.h
single_consumer: test.h  $(ROOT)/squeue/include/lock_free_queue.h
single_consumer: test.h  $(ROOT)/queue/include/lock_free_queue.h
clean:
    rm -f $(TEMPFILES) $(PROGS)
$(LIBSQUEUE):
    (cd $(ROOT)/squeue && $(MAKE))
$(LIBQUEUE):
    (cd $(ROOT)/queue && $(MAKE))
test/multiple_queue_consumer
Binary files differ
test/multiple_queue_productor
Binary files differ
test/single_consumer
Binary files differ
test/single_productor
Binary files differ
test/test
Binary files differ
test/test.c
@@ -0,0 +1,35 @@
#include "test.h"
#define MKEY 0x2222
int testmatshm() {
  int shmid = -1;
  void *shmp;
  shmid = shmget(MKEY, 1024, IPC_CREAT | IPC_EXCL | OBJ_PERMS);
  if (shmid == -1 && errno == EEXIST) {
    printf("first create\n");
    shmid = shmget(MKEY, 0, 0);
  }
  if (shmid == -1)
    err_exit(errno, "testmatshm shmget");
  shmp = shmat(shmid, NULL, 0);
}
typedef struct buf_t { char buf[7]; } buf_t;
void test(int size) {
  char buf[size];
  printf("size = %d\n", sizeof(buf));
}
char msg[10];
int main() {
  // testmatshm();
  // testmatshm();
  // sleep(60);
  // printf("size = %d, msg = %d\n", sizeof(buf_t), sizeof(msg));
  test(12);
}
test/test_queue
Binary files differ
test/test_queue.c
@@ -84,8 +84,42 @@
}
// void testArr(unsigned size) {
//     unsigned int i = 0;
//     int key = 2;
//     char item[size];
//     size_t qsize = 16;
//       //LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (key, qsize);
//     SHMQueue<char[size]> *queue = new SHMQueue<char[size]>(key, 16);
//     // LockFreeQueue<struct Item> queue(16);
//     for(i = 0; i < qsize; i++) {
//         sprintf(item, "%d hello", i);
//         if(queue->push(item)) {
//              cout << i << " push:" << item << endl;
//         }
//     }
//     struct timespec timeout = {1, 0};
//     i = 0;
//     while((queue->pop_timeout(item, &timeout)) ) {
//         cout << i << " pop:" << item << endl;
//        // cout <<  item.pic << endl;
//         i++;
//     }
//     delete queue;
// }
int main () {
    testString();
    // testArr(12);
    mm_destroy();
test/test_timeout
Binary files differ
test2/Makefile
New file
@@ -0,0 +1,32 @@
#
# Makefile for common library.
#
ROOT=..
#LDLIBS+=-Wl,-rpath=$(ROOT)/common:$(ROOT)/lib/jsoncpp
# 开源工具包路径
LDDIR += -L$(ROOT)/queue
# 开源工具包
LDLIBS += -lshm_queue -lpthread
INCLUDE += -I$(ROOT)/queue/ -I$(ROOT)/queue/include
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
PROGS =    test_queue_wrapper
build: $(PROGS)
# test1: $(LIBCOMMON)
# 如果包A 引用包B, B 要放在 A 后面
clean:
    rm -f $(TEMPFILES) $(PROGS)
$(LIBQUEUE):
    (cd $(ROOT)/queue && $(MAKE))
test2/test_queue_wrapper
Binary files differ
test2/test_queue_wrapper.c
New file
@@ -0,0 +1,48 @@
#include "shm_queue_wrapper.h"
#include "mm.h"
typedef struct message_t
{
    char method[20];
    int code;
} message_t;
void test1() {
    unsigned int i = 0;
    int key = 2;
    size_t qsize = 16;
      shmqueue_t queue;
    shmqueue_init(&queue, key, qsize, sizeof(message_t));
    message_t item;
    // LockFreeQueue<struct Item> queue(16);
    for(i = 0; i < qsize; i++) {
        sprintf(item.method, "hello");
        item.code = i ;
        if(shmqueue_push(&queue, (void *)&item)) {
              printf("push:%d %s\n", item.code, item.method );
        }
    }
    struct timespec timeout = {1, 0};
    i = 0;
    while((shmqueue_pop_timeout(&queue, (void *)&item, &timeout)) ) {
        printf("pop:%d %s\n", item.code, item.method );
       // cout <<  item.pic << endl;
        i++;
    }
    //销毁队列
    shmqueue_destroy(&queue);
}
int main () {
    test1();
    //整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。
    mm_destroy();
    return 0;
}