wangzhengquan
2020-07-13 e1e97f1f98baf82efcd5825d7c7a7b4b1b2f2e40
udpate
6个文件已添加
18个文件已修改
394 ■■■■ 已修改文件
Make.defines.linux 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/hashtable.c 72 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/hashtable.h 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/mem_pool.h 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/shm_queue_wrapper.h 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
queue/mem_pool.c 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/sem_util.c 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/sem_util.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/shm_queue_wrapper.c 117 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/multiple_queue_consumer 补丁 | 查看 | 原始文档 | blame | 历史
test/multiple_queue_productor 补丁 | 查看 | 原始文档 | blame | 历史
test/single_consumer 补丁 | 查看 | 原始文档 | blame | 历史
test/single_productor 补丁 | 查看 | 原始文档 | blame | 历史
test/test_lostdata 补丁 | 查看 | 原始文档 | blame | 历史
test/test_queue 补丁 | 查看 | 原始文档 | blame | 历史
test/test_timeout 补丁 | 查看 | 原始文档 | blame | 历史
test2/Makefile 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/client 补丁 | 查看 | 原始文档 | blame | 历史
test2/client.c 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/server 补丁 | 查看 | 原始文档 | blame | 历史
test2/server.c 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/test_queue_wrapper 补丁 | 查看 | 原始文档 | blame | 历史
test2/test_queue_wrapper.c 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Make.defines.linux
@@ -2,7 +2,7 @@
# Definitions required in all program directories to compile and link
# C programs using gcc.
DEBUG=y
CC = g++
COMPILE.c = $(CC) $(CFLAGS) $(CPPFLAGS) -c
LINK.c = $(CC) $(CFLAGS) $(CPPFLAGS) $(LDFLAGS)
queue/hashtable.c
@@ -1,7 +1,7 @@
#include "usg_common.h"
#include "hashtable.h"
#include "mm.h"
#include "sem_util.h"
typedef struct tailq_entry_t
{
@@ -14,6 +14,9 @@
  TAILQ_ENTRY(tailq_entry_t) joint;
} tailq_entry_t;
static int hashtable_mutex;
#define START_KEY 1000
typedef  TAILQ_HEAD(tailq_header_t, tailq_entry_t) tailq_header_t;
@@ -21,11 +24,16 @@
void hashtable_init(hashtable_t *hashtable )
{
  memset(hashtable, 0, sizeof(hashtable_t));
  hashtable->mutex = SemUtil::get(IPC_PRIVATE, 1);
  hashtable->wlock = SemUtil::get(IPC_PRIVATE, 1);
  hashtable->cond = SemUtil::get(IPC_PRIVATE, 1);
  hashtable->readcnt = 0;
}
void *hashtable_get(hashtable_t *hashtable, int key)
void *_hashtable_get(hashtable_t *hashtable, int key)
{
  size_t code = hashcode(key);
  tailq_entry_t *item;
@@ -47,7 +55,7 @@
}
void* hashtable_put(hashtable_t *hashtable, int key, void *value)
void* _hashtable_put(hashtable_t *hashtable, int key, void *value)
{
  size_t code = hashcode(key);
  void *oldvalue;
@@ -158,3 +166,61 @@
  return key % MAPSIZE;
  /*printf("hashfun = %ld\n", code);*/
}
int hashtable_alloc_key(hashtable_t *hashtable) {
  int key = START_KEY;
  SemUtil::dec(hashtable->wlock);
  while(_hashtable_get(hashtable, key) != NULL) {
    key++;
  }
  //_hashtable_put(hashtable, key, (void *)1);
  SemUtil::inc(hashtable->wlock);
  return key;
}
void *hashtable_get(hashtable_t *hashtable, int key) {
   SemUtil::dec(hashtable->mutex);
   hashtable->readcnt++;
   if (hashtable->readcnt == 1) {
    //获取读写锁
    SemUtil::dec(hashtable->wlock);
   }
   SemUtil::inc(hashtable->mutex);
   void * res = _hashtable_get(hashtable, key);
   SemUtil::dec(hashtable->mutex);
   hashtable->readcnt--;
   if(hashtable->readcnt == 0) {
    //释放读写锁
    SemUtil::inc(hashtable->wlock);
  //通知写
    SemUtil::set(hashtable->cond, 1);
   }
   SemUtil::inc(hashtable->mutex);
   return res;
}
void* hashtable_put(hashtable_t *hashtable, int key, void *value) {
  SemUtil::dec(hashtable->mutex);
  while (hashtable->readcnt > 0)
  {
    SemUtil::set(hashtable->cond, 0);
    SemUtil::inc(hashtable->mutex);
    //等待写通知
    SemUtil::dec(hashtable->cond);
    SemUtil::dec(hashtable->mutex);
  }
  SemUtil::inc(hashtable->mutex);
  //获取读写锁
  SemUtil::dec(hashtable->wlock);
  _hashtable_put(hashtable, key, value);
  //释放读写锁
  SemUtil::inc(hashtable->wlock);
}
queue/hashtable.h
@@ -9,6 +9,10 @@
typedef struct hashtable_t
{
 struct tailq_header_t* array[MAPSIZE];
 int mutex;
 int wlock;
 int cond;
 size_t readcnt;
} hashtable_t;
@@ -20,4 +24,6 @@
void hashtable_printall(hashtable_t *hashtable);
int hashtable_alloc_key(hashtable_t *hashtable);
#endif
queue/include/mem_pool.h
New file
@@ -0,0 +1,16 @@
#ifndef _MEM_POOL_H_
#define _MEM_POOL_H_
#include "mm.h"
extern void mem_pool_init(size_t heap_size);
extern void mem_pool_destroy(void);
extern void *mem_pool_malloc (size_t size);
extern void mem_pool_free (void *ptr);
extern void *mem_pool_realloc(void *ptr, size_t size);
extern hashtable_t * mem_pool_get_hashtable();
// extern int mm_checkheap(int verbose);
#endif
queue/include/shm_queue_wrapper.h
@@ -11,62 +11,67 @@
#endif
/**
 * 初始化
 * 创建队列
 * @ shmqueue 
 * @ key 标识共享队列的唯一key
 * @ queue_size 队列大小 , 这个值必须是2的指数即 1, 2, 4, 8, 16 等
 * @ ele_size 队列中元素大小, 这个值不能超过512,当然如果需要可以调整这个最大限制
 * @ key 标识共享队列的唯一key, 如果key为空会自动分配一个key,如果key不会空会检查是否有重复绑定的情况, 然后绑定key.
 * @ queue_size 队列大小
 * @ ele_size 队列中元素大小
 */
void* shmqueue_init(int key, int queue_size, int ele_size);
void* shmqueue_create( int * key, int queue_size);
/**
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并返回空值
 */
void* shmqueue_attach(int key);
/**
 * 销毁
*/
void shmqueue_destroy(void *shmqueue);
void shmqueue_drop(void * _shmqueue) ;
/**
 * 队列元素的个数
 */
uint32_t shmqueue_size(void *shmqueue);
int shmqueue_size(void * _shmqueue);
/**
 * 是否已满
 */
int shmqueue_full(void *shmqueue);
int shmqueue_full(void * _shmqueue);
/**
 * 是否为空
 */
int shmqueue_empty(void *shmqueue);
int shmqueue_empty(void * _shmqueue) ;
/**
 * 入队, 队列满时等待
 */
int shmqueue_push(void *shmqueue, void *src_ele);
int shmqueue_push(void * _shmqueue, void *src_ele, int ele_size);
/**
 * 入队, 队列满时立即返回
 */
int shmqueue_push_nowait(void *shmqueue, void *src_ele);
int shmqueue_push_nowait(void * _shmqueue, void *src_ele, int ele_size) ;
/**
 * 入队, 指定时间内入队不成功就返回
 * timespec {sec秒, nsec纳秒}
 */
int shmqueue_push_timeout(void *shmqueue, void *src_ele, struct timespec * timeout);
int shmqueue_push_timeout(void * _shmqueue, void *src_ele, int ele_size, void * _timeout);
/**
 * 出队, 队列空时等待
 */
int shmqueue_pop(void *shmqueue, void *dest_ele);
int shmqueue_pop(void * _shmqueue, void *dest_ele);
/**
 * 出队, 队列空时立即返回
 */
int shmqueue_pop_nowait(void *shmqueue, void *dest_ele);
int shmqueue_pop_nowait(void * _shmqueue, void *dest_ele) ;
/**
 * 出队, 指定时间内出队不成功就返回
 */
int shmqueue_pop_timeout(void *shmqueue, void *dest_ele, struct timespec * timeout);
int shmqueue_pop_timeout(void * _shmqueue, void *dest_ele, void * _timeout);
#ifdef __cplusplus
}
queue/libshm_queue.a
Binary files differ
queue/mem_pool.c
New file
@@ -0,0 +1,26 @@
#include "mem_pool.h"
void mem_pool_init(size_t heap_size) {
    mm_init(heap_size);
}
void mem_pool_destroy(void) {
    mm_destroy();
}
void *mem_pool_malloc (size_t size) {
    return mm_malloc(size);
}
void mem_pool_free (void *ptr) {
    mm_free(ptr);
}
void *mem_pool_realloc (void *ptr, size_t size) {
    return mm_realloc(ptr, size);
}
hashtable_t * mem_pool_get_hashtable() {
    return mem_get_hashtable();
}
queue/sem_util.c
@@ -136,3 +136,13 @@
}
void SemUtil::set(int semId, int val)
{
    union semun arg;
    arg.val = val;
    if (semctl(semId, 0, SETVAL, arg) == -1)
        err_exit(errno, "SemUtil::set semctl");
}
queue/sem_util.h
@@ -13,6 +13,8 @@
    int inc(int semId);
    void remove(int semid);
    void set(int semId, int val);
}
#endif
queue/shm_queue_wrapper.c
@@ -1,48 +1,75 @@
#include "shm_queue_wrapper.h"
#define MAX_ELE_SIZE 512
#include "mm.h"
#include "hashtable.h"
typedef struct ele_t {
    char buf[MAX_ELE_SIZE];
    size_t size;
    void * buf;
} ele_t;
typedef struct shmqueue_t {
    size_t ele_size;
    void *mqueue;
} shmqueue_t;
/**
 * 初始化
 * 创建队列
 * @ shmqueue 
 * @ key 标识共享队列的唯一key
 * @ key 标识共享队列的唯一标识, key是一个指针里面存储了key的值, 如果key的值为-1系统会自动分配一个key值并把该key的值赋给key指针。如果key的值不会空会检查是否有重复绑定的情况, 有重复就报错没有就创建队列并绑定key.
 * @ queue_size 队列大小
 * @ ele_size 队列中元素大小
 * @ size 队列中元素大小
 */
void* shmqueue_init( int key, int queue_size, int ele_size) {
    if(ele_size > MAX_ELE_SIZE) {
        err_exit(0, "shmqueue_init : the element size is supper than max element buffer size");
void* shmqueue_create( int * key, int queue_size) {
    int  mkey;
    hashtable_t *hashtable = get_mm_hashtable();
    if(*key == -1) {
        mkey = hashtable_alloc_key(hashtable);
        *key = mkey;
    } else {
        mkey = *key;
        if(hashtable_get(hashtable, mkey)!= NULL) {
            err_msg(0, "key %d has already been in used!", mkey);
            return NULL;
        }
    }
    shmqueue_t *shmqueue = (shmqueue_t*)malloc(sizeof(shmqueue_t));
    SHMQueue<ele_t> *queue = new SHMQueue<ele_t>(key, queue_size);
    SHMQueue<ele_t> *queue = new SHMQueue<ele_t>(mkey, queue_size);
    shmqueue->mqueue = (void *)queue;
    shmqueue->ele_size = ele_size;
    return (void *)shmqueue;
}
/**
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并返回空值
 */
void* shmqueue_attach(int key) {
    hashtable_t *hashtable = get_mm_hashtable();
    if(hashtable_get(hashtable, key)== NULL) {
        err_msg(0, "shmqueue_attach:attach failed, The queue  binding on key %d has not been created!", key);
        return NULL;
    }
    shmqueue_t *shmqueue = (shmqueue_t*)malloc(sizeof(shmqueue_t));
    SHMQueue<ele_t> *queue = new SHMQueue<ele_t>(key, 0);
    shmqueue->mqueue = (void *)queue;
    return (void *)shmqueue;
}
/**
 * 销毁
*/
void shmqueue_destroy(void * _shmqueue) {
void shmqueue_drop(void * _shmqueue) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    delete (SHMQueue<ele_t> *)shmqueue->mqueue;
    delete _shmqueue;
    delete shmqueue;
}
/**
 * 队列元素的个数
 */
uint32_t shmqueue_size(void * _shmqueue) {
int shmqueue_size(void * _shmqueue) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->size();
    return (int)(((SHMQueue<ele_t> *)(shmqueue->mqueue))->size()) ;
}
/**
@@ -64,42 +91,51 @@
/**
 * 入队, 队列满时等待
 */
int shmqueue_push(void * _shmqueue, void *src_ele) {
int shmqueue_push(void * _shmqueue, void *src, int size) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t dest_ele;
    memcpy(&dest_ele.buf, src_ele, shmqueue->ele_size);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push(dest_ele);
    ele_t dest;
    dest.size = size;
    dest.buf = mm_malloc(size);
    memcpy(dest.buf, src, size);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push(dest);
}
/**
 * 入队, 队列满时立即返回
 */
int shmqueue_push_nowait(void * _shmqueue, void *src_ele) {
int shmqueue_push_nowait(void * _shmqueue, void *src, int size) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t dest_ele;
    memcpy(&dest_ele.buf, src_ele, shmqueue->ele_size);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_nowait(dest_ele);
    ele_t dest;
    dest.size = size;
    dest.buf = mm_malloc(size);
    memcpy(dest.buf, src, size);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_nowait(dest);
}
/**
 * 入队, 指定时间内入队不成功就返回
 * timespec {sec秒, nsec纳秒}
 */
int shmqueue_push_timeout(void * _shmqueue, void *src_ele, struct timespec * timeout) {
int shmqueue_push_timeout(void * _shmqueue, void *src, int size,  void * _timeout) {
    struct timespec *timeout = (struct timespec *)_timeout;
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t dest_ele;
    memcpy(&dest_ele.buf, src_ele, shmqueue->ele_size);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_timeout(dest_ele, timeout);
    ele_t dest;
    dest.size = size;
    dest.buf = mm_malloc(size);
    memcpy(dest.buf, src, size);
    return ((SHMQueue<ele_t> *)(shmqueue->mqueue))->push_timeout(dest, timeout);
}
/**
 * 出队, 队列空时等待
 */
int shmqueue_pop(void * _shmqueue, void *dest_ele) {
int shmqueue_pop(void * _shmqueue, void *dest) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t src_ele;
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop(src_ele);
    ele_t src;
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop(src);
    if (rv) {
        memcpy(dest_ele, &src_ele.buf, shmqueue->ele_size);
        memcpy(dest, src.buf, src.size);
        mm_free(src.buf);
        return 1;
    } else {
        return 0;
@@ -110,12 +146,13 @@
/**
 * 出队, 队列空时立即返回
 */
int shmqueue_pop_nowait(void * _shmqueue, void *dest_ele) {
int shmqueue_pop_nowait(void * _shmqueue, void *dest) {
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t src_ele;
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_nowait(src_ele);
    ele_t src;
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_nowait(src);
    if (rv) {
        memcpy(dest_ele, &src_ele.buf, shmqueue->ele_size);
        memcpy(dest, src.buf, src.size);
        mm_free(src.buf);
        return 1;
    } else {
        return 0;
@@ -125,12 +162,14 @@
/**
 * 出队, 指定时间内出队不成功就返回
 */
int shmqueue_pop_timeout(void * _shmqueue, void *dest_ele, struct timespec * timeout) {
int shmqueue_pop_timeout(void * _shmqueue, void *dest, void * _timeout) {
    struct timespec *timeout = (struct timespec *)_timeout;
    shmqueue_t * shmqueue = (shmqueue_t *)_shmqueue;
    ele_t src_ele;
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_timeout(src_ele, timeout);
    ele_t src;
    bool rv = ((SHMQueue<ele_t> *)(shmqueue->mqueue))->pop_timeout(src, timeout);
    if (rv) {
        memcpy(dest_ele, &src_ele.buf, shmqueue->ele_size);
        memcpy(dest, src.buf, src.size);
        mm_free(src.buf);
        return 1;
    } else {
        return 0;
test/multiple_queue_consumer
Binary files differ
test/multiple_queue_productor
Binary files differ
test/single_consumer
Binary files differ
test/single_productor
Binary files differ
test/test_lostdata
Binary files differ
test/test_queue
Binary files differ
test/test_timeout
Binary files differ
test2/Makefile
@@ -13,8 +13,8 @@
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
PROGS =    test_queue_wrapper
PROGS =    test_queue_wrapper server client
build: $(PROGS)
test2/client
Binary files differ
test2/client.c
New file
@@ -0,0 +1,46 @@
#include "shm_queue_wrapper.h"
#include "mm.h"
typedef struct msg_t
{
    int key;
    char buf[100];
} msg_t;
void client() {
    int key = -1;
    size_t qsize = 16;
    void * remote_queue = shmqueue_attach( 1);
    void * local_queue = shmqueue_create( &key, qsize);
    // message_t item;
    msg_t msg;
    msg.key = key;
    msg_t rec_msg;
    //入队
    while(true) {
      printf("=====>say some thing:\n");
      scanf("%s", msg.buf);
      shmqueue_push(remote_queue, (void *)&msg, sizeof(msg));
      //printf("send: %s\n",  msg.buf);
      shmqueue_pop(local_queue, (void *)&rec_msg );
      printf("=====>peer : %s\n",  rec_msg.buf);
    }
    //销毁队列
    shmqueue_drop(local_queue);
    shmqueue_drop(remote_queue);
}
int main () {
    mm_init(512);
    client();
    //整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。
    mm_destroy();
    return 0;
}
test2/server
Binary files differ
test2/server.c
New file
@@ -0,0 +1,43 @@
#include "shm_queue_wrapper.h"
#include "mm.h"
typedef struct msg_t
{
    int key;
    char buf[100];
} msg_t;
void server() {
     msg_t msg;
     msg_t send_msg;
     int key = 1;
    size_t qsize = 16;
    void * local_queue = shmqueue_create( &key, qsize);
    struct timespec timeout = {1, 0};
    while(shmqueue_pop(local_queue, (void *)&msg) ) {
        void * remote_queue = shmqueue_attach(msg.key);
        printf("received: %s\n", msg.buf);
        // send_msg.key = 1;
        sprintf(send_msg.buf, "hello, I have received: %s!", msg.buf);
        shmqueue_push(remote_queue, (void *)&send_msg, sizeof(send_msg));
        shmqueue_drop(remote_queue);
       // cout <<  item.pic << endl;
        // i++;
    }
    //销毁队列
    shmqueue_drop(local_queue);
}
int main () {
    mm_init(512);
    server();
    //整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。
    mm_destroy();
    return 0;
}
test2/test_queue_wrapper
Binary files differ
test2/test_queue_wrapper.c
@@ -10,21 +10,21 @@
void test1() {
    unsigned int i = 0;
    int key = 2;
    int key = -1;
 
    size_t qsize = 16;
    void * queue = shmqueue_init( key, qsize, sizeof(message_t));
    void * queue = shmqueue_create( &key, qsize);
    message_t item;
    
    for(i = 0; i < qsize; i++) {
        sprintf(item.method, "hello"); 
        item.code = i ; 
        //入队
        if(shmqueue_push(queue, (void *)&item)) {
        if(shmqueue_push(queue, (void *)&item, sizeof(message_t))) {
              printf("push:%d %s\n", item.code, item.method );
        }
    }
    printf("%d\n", key);
    struct timespec timeout = {1, 0};
    i = 0;
@@ -36,11 +36,12 @@
    }
    //销毁队列
    shmqueue_destroy(queue);
    shmqueue_drop(queue);
}
 
int main () {
    mm_init(512);
    test1();
    //整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。