9个文件已删除
4个文件已添加
2 文件已重命名
15个文件已修改
| | |
| | | |
| | | ipcrm: |
| | | -ipcrm -a |
| | | -ipcrm -M 0x1234 |
| | | -ipcrm -S 145 |
| | | -ipcrm -S 146 |
| | | -ipcrm -S 8899 |
| | | -ipcs |
| | | |
| | | # -ipcrm -M 0x1234 |
| | | # -ipcrm -S 145 |
| | | # -ipcrm -S 146 |
| | | # -ipcrm -S 8899 |
| | |
| | | /// @brief number of elements in the queue |
| | | std::atomic<uint32_t> m_count; |
| | | #endif |
| | | static int m_reference; |
| | | |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | | ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src); |
| | | |
| | | |
| | | }; |
| | | |
| | | |
| | | template <typename ELEM_T> |
| | | int ArrayLockFreeQueue<ELEM_T>::m_reference = 0; |
| | | |
| | | template <typename ELEM_T> |
| | | ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize): |
| | |
| | | #endif |
| | | { |
| | | m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T)); |
| | | m_reference++; |
| | | |
| | | } |
| | | |
| | |
| | | ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue() |
| | | { |
| | | std::cout << "destroy ArrayLockFreeQueue\n"; |
| | | m_reference--; |
| | | if(m_reference == 0) { |
| | | mm_free(m_theQueue); |
| | | } |
| | | |
| | | } |
| | | |
| | |
| | | |
| | | |
| | | |
| | | template <typename T> |
| | | template <typename ELEM_T> |
| | | class LinkedLockFreeQueue |
| | | { |
| | | |
| | | template < |
| | | typename ELEM_T_, |
| | | template <typename T> class Q_TYPE > |
| | | friend class LockFreeQueue; |
| | | private: |
| | | // class scope definitions |
| | | enum {Q_SIZE = 10}; |
| | | |
| | | |
| | | // private class members |
| | | std::atomic<Pointer<T> > Head; // pointer to front of Queue |
| | | std::atomic<Pointer<T> > Tail; // pointer to rear of Queue |
| | | std::atomic<Pointer<ELEM_T> > Head; // pointer to front of Queue |
| | | std::atomic<Pointer<ELEM_T> > Tail; // pointer to rear of Queue |
| | | //std::atomic_uint count; // current number of size in Queue |
| | | std::atomic_uint count; |
| | | const size_t qsize; // maximum number of size in Queue |
| | |
| | | bool empty() const; |
| | | bool full() const; |
| | | unsigned int size() const; |
| | | bool push(const T &item); // add item to end |
| | | bool pop(T &item); |
| | | bool push(const ELEM_T &item); // add item to end |
| | | bool pop(ELEM_T &item); |
| | | |
| | | |
| | | T& operator[](unsigned i); |
| | | ELEM_T& operator[](unsigned i); |
| | | |
| | | }; |
| | | |
| | | |
| | | |
| | | |
| | | // Queue methods |
| | |
| | | Head.store(pointer, std::memory_order_relaxed); |
| | | Tail.store(pointer, std::memory_order_relaxed); |
| | | |
| | | |
| | | |
| | | } |
| | | |
| | | template <typename T> |
| | | LinkedLockFreeQueue<T>::~LinkedLockFreeQueue() |
| | | { |
| | | std::cerr << "LinkedLockFreeQueue destory" << std::endl; |
| | | |
| | | Node<T> * nodeptr; |
| | | Pointer<T> tmp = Head.load(std::memory_order_relaxed); |
| | | while((nodeptr = tmp.ptr) != NULL) { |
| | |
| | | delete nodeptr; |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | template <typename T> |
| | |
| | | template <typename ELEM_T> |
| | | class ArrayLockFreeQueue; |
| | | |
| | | template <typename ELEM_T> |
| | | class LinkedLockFreeQueue; |
| | | |
| | | |
| | | /// @brief Lock-free queue based on a circular array |
| | | /// No allocation of extra memory for the nodes handling is needed, but it has |
| | |
| | | template <typename T> class Q_TYPE = ArrayLockFreeQueue > |
| | | class LockFreeQueue |
| | | { |
| | | |
| | | private: |
| | | int slots; |
| | | int items; |
| | | public: |
| | | std::atomic_uint reference; |
| | | /// @brief constructor of the class |
| | | LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | |
| | | }; |
| | | |
| | | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | template <typename T> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize): |
| | | m_qImpl(qsize) |
| | | LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) |
| | | { |
| | | // std::cout << "LockFreeQueue init reference=" << reference << std::endl; |
| | | slots = SemUtil::get(IPC_PRIVATE, qsize); |
| | | items = SemUtil::get(IPC_PRIVATE, 0); |
| | | } |
| | |
| | | template <typename T> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue() |
| | | { |
| | | std::cerr << "LockFreeQueue desctroy" << std::endl; |
| | | SemUtil::remove(slots); |
| | | SemUtil::remove(items); |
| | | } |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | template <typename T> static |
| | | LockFreeQueue<T>* createArrayLockFreeQueue(int key, size_t size=16) { |
| | | |
| | | LockFreeQueue<T>* _createQueue(int key, size_t size = 16) { |
| | | LockFreeQueue<T> *queue; |
| | | hashtable_t *hashtable = getHashTable(); |
| | | //LockFreeQueue<int, 10000> q; |
| | |
| | | hashtable_put(hashtable, key, (void *)queue); |
| | | } |
| | | |
| | | std::cout << "createQueue reference===" << queue->reference << std::endl; |
| | | return queue; |
| | | } |
| | | |
| | | |
| | | public: |
| | | |
| | | template <typename T> static |
| | | LockFreeQueue<T>* createQueue(int key, size_t size = 16) { |
| | | return QueueFactory::createArrayLockFreeQueue<T>(key, size); |
| | | LockFreeQueue<T> *queue = _createQueue<T>(key, size); |
| | | queue->reference++; |
| | | return queue; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | template <typename T> static |
| | | void dropQueue(int key) { |
| | | LockFreeQueue<T> *queue = QueueFactory::createQueue<T> (key); |
| | | LockFreeQueue<T> *queue = _createQueue<T> (key); |
| | | if(queue == NULL) |
| | | return; |
| | | |
| | | queue->reference--; |
| | | std::cout << "dropQueue reference===" << queue->reference << std::endl; |
| | | if(queue->reference == 0) { |
| | | delete queue; |
| | | hashtable_t *hashtable = getHashTable(); |
| | | hashtable_remove(hashtable, key); |
| | | } |
| | | |
| | | } |
| | | |
| | | }; |
| | | #endif |
| | |
| | | |
| | | static int shmid = -1; |
| | | static void *shmp; |
| | | //static int mutex = SemUtil::get(8899, 1); |
| | | static int mutex = SemUtil::get(IPC_PRIVATE, 1); |
| | | static int mutex = SemUtil::get(SHM_MUTEX_KEY, 1); |
| | | //static int mutex = SemUtil::get(IPC_PRIVATE, 1); |
| | | |
| | | static void *mem_start_brk; /* points to first byte of heap */ |
| | | static void *mem_brk; /* points to last byte of heap */ |
| | |
| | | SemUtil::inc(mutex); |
| | | return aptr; |
| | | } else { |
| | | fprintf(stderr, "mm_malloc : out of memery\n"); |
| | | err_exit(0, "mm_malloc : out of memery\n"); |
| | | return NULL; |
| | | } |
| | | |
| | |
| | | /* Hard-coded keys for IPC objects */ |
| | | |
| | | #define SHM_KEY 0x1234 /* Key for shared memory segment */ |
| | | #define SHM_MUTEX_KEY 0x8800 |
| | | |
| | | #define OBJ_PERMS (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP) |
| | | |
| | |
| | | include $(ROOT)/Make.defines.$(PLATFORM) |
| | | |
| | | |
| | | PROGS = test_queue |
| | | PROGS = test_queue single_productor single_consumer multiple_queue_productor multiple_queue_consumer test_timeout |
| | | |
| | | |
| | | build: $(PROGS) |
| | |
| | | |
| | | test_queue: test.h $(ROOT)/squeue/include/lock_free_queue.h |
| | | |
| | | single_productor: test.h $(ROOT)/squeue/include/lock_free_queue.h |
| | | |
| | | productor: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h |
| | | |
| | | |
| | | consumer: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h |
| | | |
| | | |
| | | single_productor: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h |
| | | |
| | | single_consumer: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h |
| | | single_consumer: test.h $(ROOT)/squeue/include/lock_free_queue.h |
| | | |
| | | clean: |
| | | rm -f $(TEMPFILES) $(PROGS) |
File was renamed from test/consumer.c |
| | |
| | | using namespace std; |
| | | |
| | | |
| | | #define NTHREADS 4 |
| | | struct Targ targs[NTHREADS]; |
| | | size_t qsize = 16; |
| | | |
| | | void sigint_handler(int sig) { |
| | | destroy(); |
| | | mm_destroy(); |
| | | exit(0); |
| | | |
| | | } |
| | |
| | | void* run (void *arg) { |
| | | struct Targ * targ = (struct Targ * )arg; |
| | | // SArrayLockFreeQueue<struct Item> *queue = QFactory::createArrayLockFreeQueue<struct Item> (targ->key, 10); |
| | | SLinkedLockFreeQueue<struct Item> *queue = QFactory::createLinkedLockFreeQueue<struct Item> (targ->key, 10); |
| | | |
| | | LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (targ->key, qsize); |
| | | struct Item item; |
| | | struct timespec timeout = {5, 0}; |
| | | |
| | | int i = 0; |
| | | while((queue->remove_timeout(item, &timeout)) ) { |
| | | // cout << "出队:" << item.pic << ", " << item.info << endl; |
| | | while((queue->pop_timeout(item, &timeout)) ) { |
| | | printf("consumer(%d) 出队: {%d, %d}\n", targ->key, item.pic, item.info); |
| | | // cout << item.pic << endl; |
| | | |
| | | i++; |
| | |
| | | if(pthread_join(tids[i], &res[i])!=0) { |
| | | perror("productor pthread_join"); |
| | | } else { |
| | | fprintf(stderr, "cosumer %d 读取了 %ld 条数据\n", i, (long)res[i]); |
| | | fprintf(stderr, "cosumer(%d) 读取了 %ld 条数据\n", i, (long)res[i]); |
| | | } |
| | | } |
| | | |
| | | |
| | | destroy(); |
| | | mm_destroy(); |
| | | cerr << "cosumer quit" << endl; |
| | | exit(EXIT_SUCCESS); |
| | | } |
File was renamed from test/productor.c |
| | |
| | | |
| | | using namespace std; |
| | | |
| | | |
| | | #define NTHREADS 4 |
| | | struct Targ targs[NTHREADS]; |
| | | size_t qsize = 16; |
| | | |
| | | void sigint_handler(int sig) { |
| | | cerr << "sigint_handler" << endl; |
| | | destroy(); |
| | | mm_destroy(); |
| | | exit(0); |
| | | |
| | | } |
| | |
| | | struct Targ * targ = (struct Targ * )arg; |
| | | // cerr << "productor key="<<targ->key << endl; |
| | | err_msg(0, "productor key = %d\n", targ->key ); |
| | | SLinkedLockFreeQueue<struct Item> *queue = QFactory::createLinkedLockFreeQueue<struct Item> (targ->key, 10); |
| | | //SLinkedLockFreeQueue<struct Item> *queue = QFactory::createLinkedLockFreeQueue<struct Item> (targ->key, 10); |
| | | //SArrayLockFreeQueue<struct Item> *queue = QFactory::createArrayLockFreeQueue<struct Item> (targ->key, 10); |
| | | |
| | | LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (targ->key, qsize); |
| | | /* Transfer blocks of data from stdin to shared memory */ |
| | | int end = targ->end; |
| | | struct Item item; |
| | |
| | | |
| | | item.pic = i; |
| | | item.info = i; |
| | | while((end == -1 || (i < end) ) && (queue->add_timeout(item, &timeout)) ) { |
| | | while((end == -1 || (i < end) ) && (queue->push(item)) ) { |
| | | item.pic = i; |
| | | item.info = i; |
| | | |
| | | // cout << "入队:" << item.pic << ", " << item.info << endl; |
| | | //cout << item.pic << endl; |
| | | printf("productor(%d) 入队:{%d, %d}\n", targ->key, item.pic, item.info); |
| | | |
| | | i++; |
| | | |
| | |
| | | int i; |
| | | pthread_t tids[NTHREADS]; |
| | | void *res[NTHREADS]; |
| | | struct Targ targs[NTHREADS]; |
| | | |
| | | |
| | | |
| | | for (i = 0; i< NTHREADS; i++) { |
| | |
| | | targs[i].end = 100000; |
| | | |
| | | pthread_create(&tids[i], NULL, run, (void *)&targs[i]); |
| | | sleep(1); |
| | | //sleep(1); |
| | | } |
| | | |
| | | for (i = 0; i< NTHREADS; i++) { |
| | | if(pthread_join(tids[i], &res[i])!=0) { |
| | | perror("productor pthread_join"); |
| | | } else { |
| | | fprintf(stderr, "productor %d 写入 %ld 条数据\n", i, (long)res[i]); |
| | | fprintf(stderr, "productor(%d) 写入 %ld 条数据\n", i, (long)res[i]); |
| | | } |
| | | } |
| | | |
| | |
| | | using namespace std; |
| | | |
| | | |
| | | |
| | | int key = 1; |
| | | |
| | | void sigint_handler(int sig) { |
| | | destroy(); |
| | | destroy(key); |
| | | exit(0); |
| | | |
| | | } |
| | | int main(int argc, char *argv[]) |
| | | { |
| | | |
| | | int qsize = 16; |
| | | |
| | | void *shmp; |
| | | signal(SIGINT, sigint_handler); |
| | | /* Get IDs for semaphore set and shared memory created by writer */ |
| | | //SAbstractQueue<struct Item> *queue = QFactory::createQueue<struct Item> (1, 10); |
| | | SLinkedLockFreeQueue<struct Item> *queue = QFactory::createLinkedLockFreeQueue<struct Item> (1, 10); |
| | | |
| | | LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (key, qsize); |
| | | /* Transfer blocks of data from shared memory to stdout */ |
| | | |
| | | struct timespec timeout = {10, 0}; |
| | | struct Item item; |
| | | while(queue->remove(item)) { |
| | | while(queue->pop(item)) { |
| | | |
| | | cout << item.pic << endl; |
| | | //sleep(1); |
| | | } |
| | | |
| | | destroy(); |
| | | destroy(key); |
| | | cerr << "consumer quit" << endl; |
| | | exit(EXIT_SUCCESS); |
| | | } |
| | |
| | | |
| | | using namespace std; |
| | | |
| | | int key = 1; |
| | | |
| | | void sigint_handler(int sig) { |
| | | cerr << "sigint_handler" << endl; |
| | | destroy(); |
| | | destroy(key); |
| | | exit(0); |
| | | |
| | | } |
| | | |
| | | int main(int argc, char *argv[]) |
| | | { |
| | | void *shmp; |
| | | string line; |
| | | signal(SIGINT, sigint_handler); |
| | | int qsize = 16; |
| | | |
| | | /* Create set containing two semaphores; initialize so that |
| | | writer has first access to shared memory. */ |
| | | int start = 0; |
| | |
| | | end = atoi(argv[2]); |
| | | } |
| | | //SAbstractQueue<struct Item> *queue = QFactory::createQueue<struct Item> (1, 10); |
| | | SLinkedLockFreeQueue<struct Item> *queue = QFactory::createLinkedLockFreeQueue<struct Item> (1, 10); |
| | | LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (key, qsize); |
| | | |
| | | |
| | | |
| | | /* Transfer blocks of data from stdin to shared memory */ |
| | |
| | | item.pic = i; |
| | | item.info = i; |
| | | //while((end == -1 || (i < end) ) && (queue->add(item)) ) { |
| | | while((queue->add(item)) ) { |
| | | while((queue->push(item)) ) { |
| | | item.pic = i; |
| | | item.info = i; |
| | | |
| | |
| | | i++; |
| | | |
| | | } |
| | | destroy(); |
| | | destroy(key); |
| | | cerr << "productor quit" << endl; |
| | | exit(EXIT_SUCCESS); |
| | | } |
| | |
| | | #include "queue_factory.h" |
| | | #include <pthread.h> |
| | | |
| | | #define NTHREADS 3 |
| | | |
| | | struct Item |
| | | { |
| | |
| | | // delete queue; |
| | | |
| | | QueueFactory::dropQueue<struct Item>(key); |
| | | mm_destroy(); |
| | | //mm_destroy(); |
| | | |
| | | |
| | | } |
| | |
| | | |
| | | |
| | | |
| | | struct timespec timeout = {5, 0}; |
| | | struct timespec timeout = {1, 0}; |
| | | |
| | | i = 0; |
| | | while((queue->pop(item)) ) { |
| | | while((queue->pop_timeout(item, &timeout)) ) { |
| | | cout << i << " pop:" << item.pic << ", " << item.info << endl; |
| | | // cout << item.pic << endl; |
| | | i++; |
New file |
| | |
| | | #include "test.h" |
| | | |
| | | |
| | | using namespace std; |
| | | int key = 2; |
| | | size_t qsize = 16; |
| | | |
| | | // 销毁共享内存和信号 |
| | | void sigint_handler(int sig) { |
| | | destroy(key); |
| | | exit(0); |
| | | |
| | | } |
| | | |
| | | void productor() { |
| | | LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (key, qsize); |
| | | |
| | | /* Transfer blocks of data from stdin to shared memory */ |
| | | struct Item item; |
| | | struct timespec timeout = {5, 0}; |
| | | int i = 0; |
| | | while(true) { |
| | | item.pic = i; |
| | | item.info = i; |
| | | |
| | | if(queue->push_nowait(item)) { |
| | | cout << "入队:" << item.pic << ", " << item.info << endl; |
| | | } else { |
| | | cout << "队列已经满,push_nowait 返回" << endl; |
| | | } |
| | | |
| | | |
| | | if(queue->push_timeout(item, &timeout)) { |
| | | cout << "入队:" << item.pic << ", " << item.info << endl; |
| | | } else { |
| | | cout << "队列已经满,等待5s, push_timeout 返回" << endl; |
| | | break; |
| | | } |
| | | |
| | | // if (i == (1 << 30)) |
| | | // i = 0; |
| | | //sleep(1); |
| | | i++; |
| | | |
| | | } |
| | | } |
| | | |
| | | |
| | | void consumer() { |
| | | LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (key, qsize); |
| | | /* Transfer blocks of data from shared memory to stdout */ |
| | | |
| | | while(1) { |
| | | struct Item item; |
| | | if (queue->pop_nowait(item)) { |
| | | cout << "出队:" << item.pic << ", " << item.info << endl; |
| | | } else { |
| | | cout << "队列为空,pop_nowait返回" << endl; |
| | | } |
| | | |
| | | struct timespec timeout = {5, 0}; |
| | | |
| | | if (queue->pop_timeout(item, &timeout)) { |
| | | cout << "出队:" << item.pic << ", " << item.info << endl; |
| | | } else { |
| | | cout << "队列为空,等待5s,pop_timeout返回" << endl; |
| | | break; |
| | | } |
| | | |
| | | } |
| | | } |
| | | int main(int argc, char *argv[]) |
| | | { |
| | | |
| | | |
| | | signal(SIGINT, sigint_handler); |
| | | |
| | | productor(); |
| | | consumer(); |
| | | |
| | | // 销毁共享内存和信号 |
| | | destroy(key); |
| | | exit(EXIT_SUCCESS); |
| | | } |