wangzhengquan
2020-07-07 37a7bd95042c19d7334b099d50ac6dc8e07e4b4e
update
9个文件已删除
4个文件已添加
2 文件已重命名
15个文件已修改
500459 ■■■■■ 已修改文件
Makefile 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/include/array_lock_free_queue.h 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/include/linked_lock_free_queue.h 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/include/lock_free_queue.h 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/include/queue_factory.h 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/mm.c 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/mm.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/Makefile 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/consumer 补丁 | 查看 | 原始文档 | blame | 历史
test/consumer_timeout.c 55 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/multiple_queue_consumer 补丁 | 查看 | 原始文档 | blame | 历史
test/multiple_queue_consumer.c 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/multiple_queue_productor 补丁 | 查看 | 原始文档 | blame | 历史
test/multiple_queue_productor.c 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/p.txt 500000 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/productor 补丁 | 查看 | 原始文档 | blame | 历史
test/productor_timeout 补丁 | 查看 | 原始文档 | blame | 历史
test/productor_timeout.c 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/single_consumer 补丁 | 查看 | 原始文档 | blame | 历史
test/single_consumer.c 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/single_productor 补丁 | 查看 | 原始文档 | blame | 历史
test/single_productor.c 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.h 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_atomic 补丁 | 查看 | 原始文档 | blame | 历史
test/test_atomic.c 81 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_cas.c 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_queue 补丁 | 查看 | 原始文档 | blame | 历史
test/test_queue.c 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_timeout 补丁 | 查看 | 原始文档 | blame | 历史
test/test_timeout.c 84 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Makefile
@@ -12,7 +12,9 @@
ipcrm:
    -ipcrm -a
    -ipcrm -M 0x1234
    -ipcrm -S 145
    -ipcrm -S 146
    -ipcrm -S 8899
    -ipcs
#     -ipcrm -M 0x1234
#     -ipcrm -S 145
#     -ipcrm -S 146
#     -ipcrm -S 8899
squeue/include/array_lock_free_queue.h
@@ -65,18 +65,14 @@
    /// @brief number of elements in the queue
    std::atomic<uint32_t> m_count;
#endif
    static int m_reference;
private:
    /// @brief disable copy constructor declaring it private
    ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src);
};
template <typename ELEM_T>
int ArrayLockFreeQueue<ELEM_T>::m_reference = 0;
template <typename ELEM_T>
ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize):
@@ -89,7 +85,6 @@
#endif
{
    m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T));
    m_reference++;
}
@@ -97,10 +92,7 @@
ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue()
{
    std::cout << "destroy ArrayLockFreeQueue\n";
    m_reference--;
    if(m_reference == 0) {
       mm_free(m_theQueue);
    }
    
}
squeue/include/linked_lock_free_queue.h
@@ -47,17 +47,21 @@
template <typename T>
template <typename ELEM_T>
class LinkedLockFreeQueue
{
    template <
        typename ELEM_T_,
        template <typename T> class Q_TYPE >
    friend class LockFreeQueue;
private:
// class scope definitions
    enum {Q_SIZE = 10};
    
// private class members
    std::atomic<Pointer<T> > Head;       // pointer to front of Queue
    std::atomic<Pointer<T> > Tail;        // pointer to rear of Queue
    std::atomic<Pointer<ELEM_T> > Head;       // pointer to front of Queue
    std::atomic<Pointer<ELEM_T> > Tail;        // pointer to rear of Queue
    //std::atomic_uint count;          // current number of size in Queue
    std::atomic_uint count;
    const size_t qsize;    // maximum number of size in Queue
@@ -70,15 +74,13 @@
    bool empty() const;
    bool full() const;
    unsigned int size() const;
    bool push(const T &item); // add item to end
    bool pop(T &item);
    bool push(const ELEM_T &item); // add item to end
    bool pop(ELEM_T &item);
    
    T& operator[](unsigned i);
    ELEM_T& operator[](unsigned i);
};
// Queue methods
@@ -91,15 +93,12 @@
    Head.store(pointer, std::memory_order_relaxed);
    Tail.store(pointer, std::memory_order_relaxed);
}
template <typename T>
LinkedLockFreeQueue<T>::~LinkedLockFreeQueue()
{
    std::cerr << "LinkedLockFreeQueue destory" << std::endl;
    Node<T> * nodeptr;
    Pointer<T> tmp = Head.load(std::memory_order_relaxed);
    while((nodeptr = tmp.ptr) != NULL) {
@@ -108,7 +107,6 @@
        delete nodeptr;
    }
}
template <typename T>
squeue/include/lock_free_queue.h
@@ -22,6 +22,9 @@
template <typename ELEM_T>
class ArrayLockFreeQueue;
template <typename ELEM_T>
class LinkedLockFreeQueue;
/// @brief Lock-free queue based on a circular array
/// No allocation of extra memory for the nodes handling is needed, but it has 
@@ -67,10 +70,12 @@
    template <typename T> class Q_TYPE = ArrayLockFreeQueue >
class LockFreeQueue
{
private:
    int slots;
    int items;
public:    
    std::atomic_uint reference;
    /// @brief constructor of the class
    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    
@@ -130,13 +135,12 @@
};
 
template <
    typename ELEM_T, 
    template <typename T> class Q_TYPE>
LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize):
    m_qImpl(qsize)
LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
{
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
    slots = SemUtil::get(IPC_PRIVATE, qsize);
    items = SemUtil::get(IPC_PRIVATE, 0);
}
@@ -146,6 +150,7 @@
    template <typename T> class Q_TYPE>
LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue()
{
    std::cerr << "LockFreeQueue desctroy" << std::endl;
    SemUtil::remove(slots);
    SemUtil::remove(items);
}
squeue/include/queue_factory.h
@@ -22,10 +22,8 @@
    }
     
    template <typename T> static
    LockFreeQueue<T>* createArrayLockFreeQueue(int key, size_t size=16) {
    LockFreeQueue<T>* _createQueue(int key, size_t size = 16) {
        LockFreeQueue<T> *queue;
        hashtable_t *hashtable = getHashTable();
        //LockFreeQueue<int, 10000> q;
@@ -34,14 +32,18 @@
            hashtable_put(hashtable,  key, (void *)queue);
        }
        std::cout << "createQueue reference===" << queue->reference << std::endl;
        return queue;
    }
public:
    template <typename T> static
    LockFreeQueue<T>* createQueue(int key, size_t size = 16) {
        return QueueFactory::createArrayLockFreeQueue<T>(key, size);
        LockFreeQueue<T> *queue = _createQueue<T>(key, size);
        queue->reference++;
        return queue;
    }
    /**
@@ -49,11 +51,19 @@
    */
    template <typename T> static
    void dropQueue(int key) {
        LockFreeQueue<T> *queue = QueueFactory::createQueue<T> (key);
        LockFreeQueue<T> *queue = _createQueue<T> (key);
        if(queue == NULL)
            return;
        queue->reference--;
std::cout << "dropQueue reference===" << queue->reference << std::endl;
        if(queue->reference == 0) {
        delete queue;
        hashtable_t *hashtable = getHashTable();
        hashtable_remove(hashtable, key);
    }
    }
};
#endif
squeue/mm.c
@@ -77,8 +77,8 @@
static  int shmid = -1;
static  void *shmp;
//static int mutex = SemUtil::get(8899, 1);
static int mutex = SemUtil::get(IPC_PRIVATE, 1);
static int mutex = SemUtil::get(SHM_MUTEX_KEY, 1);
//static int mutex = SemUtil::get(IPC_PRIVATE, 1);
static void *mem_start_brk;  /* points to first byte of heap */
static void *mem_brk;        /* points to last byte of heap */
@@ -107,7 +107,7 @@
    SemUtil::inc(mutex);
    return aptr;
  } else {
    fprintf(stderr, "mm_malloc : out of memery\n");
    err_exit(0, "mm_malloc : out of memery\n");
    return NULL;
  }
squeue/mm.h
@@ -10,6 +10,7 @@
/* Hard-coded keys for IPC objects */
#define SHM_KEY 0x1234          /* Key for shared memory segment */
#define SHM_MUTEX_KEY 0x8800
#define OBJ_PERMS (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP)
test/Makefile
@@ -63,7 +63,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
 
PROGS =    test_queue
PROGS =    test_queue single_productor single_consumer multiple_queue_productor multiple_queue_consumer test_timeout
build: $(PROGS)
@@ -77,16 +77,9 @@
test_queue: test.h  $(ROOT)/squeue/include/lock_free_queue.h
single_productor: test.h  $(ROOT)/squeue/include/lock_free_queue.h
productor: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h
consumer: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h
single_productor: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h
single_consumer: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h
single_consumer: test.h  $(ROOT)/squeue/include/lock_free_queue.h
clean:
    rm -f $(TEMPFILES) $(PROGS)
test/consumer
Binary files differ
test/consumer_timeout.c
File was deleted
test/multiple_queue_consumer
Binary files differ
test/multiple_queue_consumer.c
File was renamed from test/consumer.c
@@ -2,9 +2,12 @@
using namespace std;
#define NTHREADS 4
struct Targ targs[NTHREADS];
size_t qsize = 16;
 
void sigint_handler(int sig) {
  destroy();
  mm_destroy();
  exit(0);
}
@@ -14,14 +17,13 @@
void* run (void *arg) {
  struct Targ * targ = (struct Targ * )arg;
  // SArrayLockFreeQueue<struct Item> *queue = QFactory::createArrayLockFreeQueue<struct Item> (targ->key, 10);
  SLinkedLockFreeQueue<struct Item> *queue = QFactory::createLinkedLockFreeQueue<struct Item> (targ->key, 10);
  LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (targ->key, qsize);
  struct Item item;
  struct timespec timeout = {5, 0};
  int i = 0;
  while((queue->remove_timeout(item, &timeout)) ) {
    // cout << "出队:" << item.pic << ", " << item.info << endl;
  while((queue->pop_timeout(item, &timeout)) ) {
    printf("consumer(%d) 出队: {%d, %d}\n", targ->key, item.pic, item.info);
   // cout <<  item.pic << endl;
    i++;
@@ -55,12 +57,12 @@
    if(pthread_join(tids[i], &res[i])!=0) {
      perror("productor pthread_join");
    } else {
      fprintf(stderr, "cosumer %d 读取了 %ld 条数据\n", i, (long)res[i]);
      fprintf(stderr, "cosumer(%d) 读取了 %ld 条数据\n", i, (long)res[i]);
    }
  }
  
  destroy();
  mm_destroy();
  cerr << "cosumer quit" << endl;
  exit(EXIT_SUCCESS);
}
test/multiple_queue_productor
Binary files differ
test/multiple_queue_productor.c
File was renamed from test/productor.c
@@ -3,11 +3,13 @@
 
using namespace std;
#define NTHREADS 4
struct Targ targs[NTHREADS];
size_t qsize = 16;
void sigint_handler(int sig) {
  cerr << "sigint_handler" << endl;
  destroy();
  mm_destroy();
  exit(0);
}
@@ -17,9 +19,10 @@
  struct Targ * targ = (struct Targ * )arg;
 // cerr << "productor key="<<targ->key << endl;
  err_msg(0, "productor key = %d\n", targ->key );
  SLinkedLockFreeQueue<struct Item> *queue = QFactory::createLinkedLockFreeQueue<struct Item> (targ->key, 10);
  //SLinkedLockFreeQueue<struct Item> *queue = QFactory::createLinkedLockFreeQueue<struct Item> (targ->key, 10);
  //SArrayLockFreeQueue<struct Item> *queue = QFactory::createArrayLockFreeQueue<struct Item> (targ->key, 10);
  LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (targ->key, qsize);
  /* Transfer blocks of data from stdin to shared memory */
  int end = targ->end;
  struct Item item;
@@ -29,12 +32,10 @@
  item.pic = i;
  item.info = i;
  while((end == -1 || (i < end) ) && (queue->add_timeout(item, &timeout)) ) {
  while((end == -1 || (i < end) ) && (queue->push(item)) ) {
    item.pic = i;
    item.info = i;
    // cout << "入队:" << item.pic << ", " << item.info << endl;
    //cout <<  item.pic << endl;
    printf("productor(%d) 入队:{%d, %d}\n", targ->key, item.pic,  item.info);
    i++;
    
@@ -54,7 +55,7 @@
  int i;
  pthread_t tids[NTHREADS];
  void *res[NTHREADS];
  struct Targ targs[NTHREADS];
   
 
  for (i = 0; i< NTHREADS; i++) {
@@ -64,14 +65,14 @@
    targs[i].end = 100000;
     
    pthread_create(&tids[i], NULL, run, (void *)&targs[i]);
    sleep(1);
    //sleep(1);
  }
  for (i = 0; i< NTHREADS; i++) {
    if(pthread_join(tids[i], &res[i])!=0) {
      perror("productor pthread_join");
    } else {
      fprintf(stderr, "productor %d 写入 %ld 条数据\n", i, (long)res[i]);
      fprintf(stderr, "productor(%d) 写入 %ld 条数据\n", i, (long)res[i]);
    }
  }
test/p.txt
File was deleted
test/productor
Binary files differ
test/productor_timeout
Binary files differ
test/productor_timeout.c
File was deleted
test/single_consumer
Binary files differ
test/single_consumer.c
@@ -2,33 +2,34 @@
using namespace std;
 
int key = 1;
 
void sigint_handler(int sig) {
  destroy();
  destroy(key);
  exit(0);
}
int main(int argc, char *argv[])
{
   
  int qsize = 16;
  void *shmp;
  signal(SIGINT,  sigint_handler);
  /* Get IDs for semaphore set and shared memory created by writer */
  //SAbstractQueue<struct Item> *queue = QFactory::createQueue<struct Item> (1, 10);
  SLinkedLockFreeQueue<struct Item> *queue = QFactory::createLinkedLockFreeQueue<struct Item> (1, 10);
  LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (key, qsize);
  /* Transfer blocks of data from shared memory to stdout */
   
  struct timespec timeout = {10, 0};
  struct Item item;
  while(queue->remove(item)) {
  while(queue->pop(item)) {
    
    cout <<  item.pic  << endl;
    //sleep(1);
  }
  destroy();
  destroy(key);
  cerr << "consumer quit" << endl;
  exit(EXIT_SUCCESS);
}
test/single_productor
Binary files differ
test/single_productor.c
@@ -3,19 +3,21 @@
 
using namespace std;
int key = 1;
void sigint_handler(int sig) {
  cerr << "sigint_handler" << endl;
  destroy();
  destroy(key);
  exit(0);
}
int main(int argc, char *argv[])
  void *shmp;
  string line;
  signal(SIGINT,  sigint_handler);
  int qsize = 16;
  /* Create set containing two semaphores; initialize so that
     writer has first access to shared memory. */
  int start = 0;
@@ -26,7 +28,8 @@
    end = atoi(argv[2]);
  }
  //SAbstractQueue<struct Item> *queue = QFactory::createQueue<struct Item> (1, 10);
  SLinkedLockFreeQueue<struct Item> *queue = QFactory::createLinkedLockFreeQueue<struct Item> (1, 10);
  LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (key, qsize);
  
  /* Transfer blocks of data from stdin to shared memory */
@@ -36,7 +39,7 @@
  item.pic = i;
  item.info = i;
  //while((end == -1 || (i < end) ) && (queue->add(item)) ) {
  while((queue->add(item)) ) {
  while((queue->push(item)) ) {
    item.pic = i;
    item.info = i;
@@ -46,7 +49,7 @@
    i++;
    
  }
  destroy();
  destroy(key);
  cerr << "productor quit" << endl;
  exit(EXIT_SUCCESS);
}
test/test.h
@@ -4,7 +4,6 @@
#include "queue_factory.h"
#include <pthread.h>
#define NTHREADS 3
struct Item
{
@@ -27,7 +26,7 @@
    // delete queue;
    QueueFactory::dropQueue<struct Item>(key);
    mm_destroy();
    //mm_destroy();
    
}
test/test_atomic
Binary files differ
test/test_atomic.c
File was deleted
test/test_cas.c
File was deleted
test/test_queue
Binary files differ
test/test_queue.c
@@ -29,10 +29,10 @@
     
    struct timespec timeout = {5, 0};
    struct timespec timeout = {1, 0};
    i = 0;
    while((queue->pop(item)) ) {
    while((queue->pop_timeout(item, &timeout)) ) {
        cout << i << " pop:" << item.pic << ", " << item.info << endl;
       // cout <<  item.pic << endl;
        i++;
test/test_timeout
Binary files differ
test/test_timeout.c
New file
@@ -0,0 +1,84 @@
#include "test.h"
using namespace std;
int key = 2;
size_t qsize = 16;
// 销毁共享内存和信号
void sigint_handler(int sig) {
  destroy(key);
  exit(0);
}
void productor() {
   LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (key, qsize);
  /* Transfer blocks of data from stdin to shared memory */
  struct Item item;
  struct timespec timeout = {5, 0};
  int i = 0;
  while(true) {
    item.pic = i;
    item.info = i;
    if(queue->push_nowait(item)) {
       cout << "入队:" << item.pic << ", " << item.info << endl;
    } else {
      cout << "队列已经满,push_nowait 返回" << endl;
    }
    if(queue->push_timeout(item, &timeout)) {
       cout << "入队:" << item.pic << ", " << item.info << endl;
    } else {
      cout << "队列已经满,等待5s, push_timeout 返回" << endl;
      break;
    }
    // if (i == (1 << 30))
    //   i = 0;
    //sleep(1);
    i++;
  }
}
void consumer() {
  LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (key, qsize);
  /* Transfer blocks of data from shared memory to stdout */
  while(1) {
    struct Item item;
    if (queue->pop_nowait(item)) {
       cout << "出队:" << item.pic << ", " << item.info << endl;
    } else {
      cout << "队列为空,pop_nowait返回" << endl;
    }
    struct timespec timeout = {5, 0};
    if (queue->pop_timeout(item, &timeout)) {
       cout << "出队:" << item.pic << ", " << item.info << endl;
    } else {
      cout << "队列为空,等待5s,pop_timeout返回" << endl;
       break;
    }
  }
}
int main(int argc, char *argv[])
{
  signal(SIGINT,  sigint_handler);
  productor();
  consumer();
  // 销毁共享内存和信号
  destroy(key);
  exit(EXIT_SUCCESS);
}