From 302ae4427b04a25e4f1ee8acadbb05bf902f47f7 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 06 七月 2020 19:09:58 +0800 Subject: [PATCH] update --- squeue/include/queue_factory.h | 21 test/consumer | 0 Makefile | 12 include/usgcommon/properties_config.h | 15 squeue/include/lock_free_queue_impl_multiple_producer.h | 245 +++++++++++++++ include/usgcommon/usg_common.h | 130 ++++++++ squeue/include/lock_free_queue_impl.h | 53 +++ squeue/mm.h | 2 include/usgcommon/logger.h | 44 ++ include/usgcommon/usg_typedef.h | 9 Make.defines.linux | 15 test/test_queue.c | 29 squeue/Makefile | 54 +- /dev/null | 10 test/test_queue | 0 test/Makefile | 100 ++++- squeue/include/lock_free_queue.h | 191 +++++++++++ test/test.h | 9 test/single_consumer | 0 test/single_productor | 0 test/productor | 0 21 files changed, 834 insertions(+), 105 deletions(-) diff --git a/Make.defines.linux b/Make.defines.linux index 8c48553..5be4d47 100755 --- a/Make.defines.linux +++ b/Make.defines.linux @@ -7,14 +7,27 @@ COMPILE.c = $(CC) $(CFLAGS) $(CPPFLAGS) -c LINK.c = $(CC) $(CFLAGS) $(CPPFLAGS) $(LDFLAGS) LDFLAGS= +LDDIR += -L$(ROOT)/lib +LDLIBS += $(LDDIR) -lusgcommon $(EXTRALIBS) +INCLUDE += -I. -I./include -I$(ROOT)/include -I$(ROOT)/include/usgcommon +ifeq ($(DEBUG),y) + DEBFLAGS = -O -g # "-O" is needed to expand inlines +else + DEBFLAGS = -O2 +endif -CFLAGS = $(INCLUDE) -g -std=c++11 -mcx16 -Wall -DLINUX -D_GNU_SOURCE $(EXTRA) +CFLAGS += $(INCLUDE) $(DEBFLAGS) -mcx16 -std=c++11 -Wall -DLINUX -D_GNU_SOURCE $(EXTRA) RANLIB=echo AR=ar AWK=awk +LIBCOMMON=$(ROOT)/common/libusgcommon.a # Common temp files to delete from each directory. TEMPFILES=core core.* *.o temp.* *.out *.a *.so +%: %.c $(LIBCOMMON) + $(CC) $(CFLAGS) $(filter-out $(LIBCOMMON), $^) -o $@ $(LDFLAGS) $(LDLIBS) + + diff --git a/Makefile b/Makefile index 97e2319..c57cc88 100755 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -DIRS = common squeue test +DIRS = squeue test all: for i in $(DIRS); do \ @@ -11,8 +11,8 @@ done ipcrm: - - ipcrm -a - - ipcrm -M 0x1234 - - ipcrm -S 145 - - ipcrm -S 146 - - ipcrm -S 8899 + -ipcrm -a + -ipcrm -M 0x1234 + -ipcrm -S 145 + -ipcrm -S 146 + -ipcrm -S 8899 diff --git a/common/Makefile b/common/Makefile deleted file mode 100755 index ca1f7e5..0000000 --- a/common/Makefile +++ /dev/null @@ -1,42 +0,0 @@ -# -# Makefile for common library. -# -ROOT=.. -PLATFORM=$(shell $(ROOT)/systype.sh) -INCLUDE+=-I./include -include $(ROOT)/Make.defines.$(PLATFORM) - -LIBCOMMON = libusgcommon.a -DLIBCOMMON = libusgcommon.so - -OBJS = usg_common.o - -all: $(LIBCOMMON) - -test: test.c - -#static lib -$(LIBCOMMON): $(OBJS) - $(AR) rv $@ $? - $(RANLIB) $@ - -#dynamic lib -$(DLIBCOMMON): $(OBJS) - $(CC) -fPIC -shared -o $@ $^ - -#PREFIX is environment variable, but if it is not set, then set default value -ifeq ($(PREFIX),) - PREFIX := /usr/local -endif - -# 浣跨敤鏂瑰紡锛� g++ test1.c -lcommon -install: $(DLIBCOMMON) $(LIBCOMMON) - sudo install -d $(PREFIX)/lib/ - sudo install -m 644 $^ $(PREFIX)/lib/ - sudo install -d $(PREFIX)/include/ - sudo install -m 644 include/* $(PREFIX)/include/ - -clean: - rm -f *.o a.out core temp.* *.a *.so - -# include $(ROOT)/Make.libapue.inc diff --git a/common/include/usg_common.h b/common/include/usg_common.h deleted file mode 100644 index 68cebfa..0000000 --- a/common/include/usg_common.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Our own header, to be included before all standard system headers. - */ - -#ifndef __USG_COMMON_H__ -#define __USG_COMMON_H__ - -#include <stdio.h> -#include <stdlib.h> -#include <stdarg.h> -#include <unistd.h> -#include <string.h> -#include <ctype.h> -#include <setjmp.h> -#include <signal.h> -#include <dirent.h> -#include <time.h> - -#include <sys/time.h> -#include <sys/types.h> -#include <sys/wait.h> -#include <sys/stat.h> -#include <sys/sem.h> -#include <fcntl.h> -#include <sys/mman.h> -#include <errno.h> -#include <math.h> -#include <pthread.h> -#include <semaphore.h> -#include <sys/socket.h> -#include <netdb.h> -#include <netinet/in.h> -#include <arpa/inet.h> -/* -define int8_t uint8_t int16_t uint16_t int32_t uint32_t int64_t uint64_t -*/ -#include <stdint.h> - - - -//c++ header - -#include <iostream> -#include <string> -#include <cstdlib> -#include <atomic> - - - -/* Our own error-handling functions */ - -void err_exit(int error, const char *fmt, ...); -void err_msg(int error, const char *fmt, ...); - -static inline int -itoa(int num, char *str) -{ - return sprintf(str, "%d", num); - -} - -static inline int -ftoa(float num, char *str) -{ - return sprintf(str, "%f", num); - -} -#endif diff --git a/common/tmp b/common/tmp deleted file mode 100644 index 427edb3..0000000 --- a/common/tmp +++ /dev/null @@ -1,2 +0,0 @@ -g++ -I./include -g -std=c++11 -mcx16 -Wall -DLINUX -D_GNU_SOURCE test.c include/usg_atomic.h -o test -<builtin>: recipe for target 'test' failed diff --git a/common/usg_common.c b/common/usg_common.c deleted file mode 100644 index 3f21682..0000000 --- a/common/usg_common.c +++ /dev/null @@ -1,76 +0,0 @@ -#include "usg_common.h" -#include <errno.h> /* for definition of errno */ -#include <stdarg.h> /* ISO C variable aruments */ - -#define MAXLINE 4096 /* max line length */ -/************************** - * Error-handling functions - **************************/ -static void err_doit(int, const char *, va_list); -//static void err_doit(int errno, const char *fmt, va_list ap); - -/*void unix_error(const char *fmt, ...) [> Unix-style error <]*/ -/*{*/ - /*va_list ap;*/ - - /*va_start(ap, fmt);*/ - /*err_doit(errno, fmt, ap);*/ - /*va_end(ap);*/ -/*}*/ - -void posix_error(int code, const char *fmt, ...) /* Posix-style error */ -{ - va_list ap; - - va_start(ap, fmt); - err_doit(code, fmt, ap); - va_end(ap); -} - -/* - * Fatal error unrelated to a system call. - * Error code passed as explict parameter. - * Print a message and terminate. - */ -void err_exit(int error, const char *fmt, ...) -{ - va_list ap; - - va_start(ap, fmt); - err_doit(error, fmt, ap); - va_end(ap); - //abort(); /* dump core and terminate */ - exit(1); -} - - - -/* - * Nonfatal error unrelated to a system call. - * Print a message and return. - */ -void err_msg(int error, const char *fmt, ...) -{ - va_list ap; - - va_start(ap, fmt); - err_doit(error, fmt, ap); - va_end(ap); -} - - -/* - * Print a message and return to caller. - * Caller specifies "errnoflag". - */ -static void err_doit(int error, const char *fmt, va_list ap) -{ - char buf[MAXLINE]; - - vsnprintf(buf, MAXLINE-1, fmt, ap); - snprintf(buf+strlen(buf), MAXLINE-strlen(buf)-1, ": %s", strerror(error)); - strcat(buf, "\n"); - fflush(stdout); /* in case stdout and stderr are the same */ - fputs(buf, stderr); - fflush(NULL); /* flushes all stdio output streams */ -} diff --git a/include/usgcommon/logger.h b/include/usgcommon/logger.h new file mode 100644 index 0000000..00bc7de --- /dev/null +++ b/include/usgcommon/logger.h @@ -0,0 +1,44 @@ +#ifndef __LOGGER_H__ +#define __LOGGER_H__ + +#include "usg_common.h" +#include "usg_typedef.h" + +class Logger { + std::string configFile; + int level; + + void dolog(const char *fmt, va_list ap) + { + char buf[MAXBUF]; + vsnprintf(buf, MAXBUF-1, fmt, ap); + strcat(buf, "\n"); + fflush(stdout); /* in case stdout and stderr are the same */ + fputs(buf, stderr); + fflush(NULL); /* flushes all stdio output streams */ + } +public: + enum { + ALL , + DEBUG , + INFO , + WARN , + ERROR , + FATAL , + OFF + }; + + Logger(int l = INFO): level(l) {} + Logger(std::string cf); + + + void log(int _level, const char *fmt, ...); + + void debug(const char *fmt, ...); + void info(const char *fmt, ...); + void warn(const char *fmt, ...); + void error(const char *fmt, ...); + void fatal(const char *fmt, ...); +}; + +#endif \ No newline at end of file diff --git a/include/usgcommon/properties_config.h b/include/usgcommon/properties_config.h new file mode 100644 index 0000000..f9d0c50 --- /dev/null +++ b/include/usgcommon/properties_config.h @@ -0,0 +1,15 @@ +#ifndef __PROPERTIES_CONFIG_H__ +#define __PROPERTIES_CONFIG_H__ +#include "usg_common.h" + +class PropertiesConfig { + std::string propertiesFile; + std::map<std::string, std::string> propertiesMap; +public: + + PropertiesConfig(std::string _propertiesFile="./config.properties"); + std::string get(std::string name); + int getInt(std::string name); +}; + +#endif \ No newline at end of file diff --git a/include/usgcommon/usg_common.h b/include/usgcommon/usg_common.h new file mode 100644 index 0000000..358fbb3 --- /dev/null +++ b/include/usgcommon/usg_common.h @@ -0,0 +1,130 @@ +/* + * Our own header, to be included before all standard system headers. + */ + +#ifndef __USG_COMMON_H__ +#define __USG_COMMON_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <stdio.h> +#include <stdlib.h> +#include <stdarg.h> +#include <unistd.h> +#include <string.h> +#include <ctype.h> +#include <setjmp.h> +#include <signal.h> +#include <dirent.h> +#include <time.h> +#include <sched.h> + +#include <sys/time.h> +#include <sys/types.h> +#include <sys/wait.h> +#include <sys/stat.h> +#include <sys/sem.h> +#include <sys/shm.h> +#include <sys/file.h> +#include <fcntl.h> +#include <sys/mman.h> +#include <errno.h> +#include <math.h> +#include <pthread.h> +#include <semaphore.h> +#include <sys/socket.h> +#include <netdb.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <libgen.h> +/* + * define int8_t uint8_t int16_t uint16_t int32_t uint32_t int64_t uint64_t +*/ +#include <stdint.h> + +#ifdef __cplusplus +} +#endif + + + +//c++ header +#ifdef __cplusplus + + +#include <iostream> +#include <fstream> +#include <string> +#include <cstdlib> +#include <atomic> +#include <algorithm> +#include <iomanip> +#include <limits> +#include <map> +#include <initializer_list> +#include <vector> +#include <thread> + +#endif + + +#ifdef __cplusplus +extern "C" { +#endif + +/* Our own error-handling functions */ + +void err_exit(int error, const char *fmt, ...); +void err_msg(int error, const char *fmt, ...); + +char *ltrim(char *str, const char *seps); +char *rtrim(char *str, const char *seps); +char *trim(char *str, const char *seps); + +static inline int +itoa(int num, char *str) +{ + return sprintf(str, "%d", num); + +} + +static inline int +ftoa(float num, char *str) +{ + return sprintf(str, "%f", num); + +} + + + +#ifdef __cplusplus +} +#endif + + + +#ifdef __cplusplus + +// static inline std::string& ltrim(std::string& str, const std::string& chars = "\t\n\v\f\r ") +// { +// str.erase(0, str.find_first_not_of(chars)); +// return str; +// } + +// static inline std::string& rtrim(std::string& str, const std::string& chars = "\t\n\v\f\r ") +// { +// str.erase(str.find_last_not_of(chars) + 1); +// return str; +// } + +// static inline std::string& trim(std::string& str, const std::string& chars = "\t\n\v\f\r ") +// { +// return ltrim(rtrim(str, chars), chars); +// } + + +#endif + +#endif diff --git a/common/include/usg_typedef.h b/include/usgcommon/usg_typedef.h similarity index 85% rename from common/include/usg_typedef.h rename to include/usgcommon/usg_typedef.h index 7976c59..55796e1 100644 --- a/common/include/usg_typedef.h +++ b/include/usgcommon/usg_typedef.h @@ -40,10 +40,13 @@ #define DIR_MODE (FILE_MODE | S_IXUSR | S_IXGRP | S_IXOTH) -#define min(a,b) ((a) < (b) ? (a) : (b)) -#define max(a,b) ((a) > (b) ? (a) : (b)) +#define MIN(a,b) ((a) < (b) ? (a) : (b)) +#define MAX(a,b) ((a) > (b) ? (a) : (b)) - + +/* Misc constants */ +#define MAXLINE 1024 /* Max text line length */ +#define MAXBUF 8192 /* Max I/O buffer size */ #ifdef __cplusplus } diff --git a/squeue/Makefile b/squeue/Makefile index 2251b9c..8333f37 100644 --- a/squeue/Makefile +++ b/squeue/Makefile @@ -2,35 +2,41 @@ # Makefile for common library. # ROOT=.. -PLATFORM=$(shell $(ROOT)/systype.sh) -INCLUDE+=-I./ -I./include -I$(ROOT)/common/include -LIBCOMMON=$(ROOT)/common/libusgcommon.a -LIBSQUEUE = libsqueue.a -DLIBSQUEUE = libsqueue.so -LDLIBS =-L$(ROOT)/common -lusgcommon $(EXTRALIBS) +#LDLIBS+=-Wl,-rpath=$(ROOT)/common:$(ROOT)/lib/jsoncpp +# 寮�婧愬伐鍏峰寘璺緞 +#LDDIR += -L$(ROOT)/lib/jsoncpp -L$(ROOT)/lib/nng +# 寮�婧愬伐鍏峰寘 +#LDLIBS += -ljsoncpp -lnng -lpthread + +#INCLUDE += -I$(ROOT)/device/include + +PLATFORM=$(shell $(ROOT)/systype.sh) include $(ROOT)/Make.defines.$(PLATFORM) +LIBSQUEUE = libsqueue.a +DLIBSQUEUE = libsqueue.so -OBJS = mm.o pcsem.o hashtable.o +SOURCES := $(wildcard *.c) +OBJS = $(patsubst %.c, %.o, $(SOURCES)) -all: $(LIBSQUEUE) +MYLIBS = $(LIBSQUEUE) -mm.o: $(LIBCOMMON) +all: build + -pcsem.o: $(LIBCOMMON) - -test: $(OBJS) $(LIBCOMMON) - +build: $(MYLIBS) #static lib -$(LIBSQUEUE): $(OBJS) $(LIBCOMMON) +$(LIBSQUEUE): $(OBJS) $(AR) rv $@ $? $(RANLIB) $@ #dynamic lib -$(DLIBSQUEUE): $(OBJS) $(LIBCOMMON) - $(CC) -fPIC -shared -o $@ $^ +$(DLIBSQUEUE): $(SOURCES) + rm -f *.o + $(CC) -fPIC -shared $(CFLAGS) $^ -o $@ $(LDFLAGS) + #PREFIX is environment variable, but if it is not set, then set default value ifeq ($(PREFIX),) @@ -38,15 +44,13 @@ endif # 浣跨敤鏂瑰紡锛� g++ test1.c -lcommon -install: $(DLIBSQUEUE) $(LIBSQUEUE) - sudo install -d $(PREFIX)/lib/ - sudo install -m 644 $^ $(PREFIX)/lib/ - sudo install -d $(PREFIX)/include/ - sudo install -m 644 include/* $(PREFIX)/include/ +install: $(MYLIBS) + install -d $(PREFIX)/lib/ + install -m 644 $^ $(PREFIX)/lib/ + install -d $(PREFIX)/include/ + install -m 644 include/* $(PREFIX)/include/ clean: - rm -f *.o a.out core temp.* *.a *.so + rm -f $(TEMPFILES) - - -include $(ROOT)/Make.common.inc +.PHONY: build clean install diff --git a/squeue/include/SArrayLockFreeQueue.h b/squeue/include/SArrayLockFreeQueue.h deleted file mode 100644 index 9de3c92..0000000 --- a/squeue/include/SArrayLockFreeQueue.h +++ /dev/null @@ -1,305 +0,0 @@ -// queue.h -- interface for a queue -#ifndef SArrayLockFreeQueue_H_ -#define SArrayLockFreeQueue_H_ -#include "mm.h" -#include "pcsem.h" -#include "SAbstractQueue.h" - - - -template <typename T> -class SArrayLockFreeQueue :public SAbstractQueue<T> -{ -private: -// class scope definitions - - T * arr; - enum {Q_SIZE = 16}; - - int slots; - int items; -// private class members - std::atomic_uint m_readIndex; - std::atomic_uint m_writeIndex; - std::atomic_uint m_maximumReadIndex; - - std::atomic_uint count; - const size_t qsize; // maximum number of size in Queue - // preemptive definitions to prevent public copying - SArrayLockFreeQueue(const SArrayLockFreeQueue & q) : qsize(0) { } - SArrayLockFreeQueue & operator=(const SArrayLockFreeQueue & q) { return *this;} - bool _add(const T &item); // add item to end - bool _remove(T &item); // remove item from front - inline unsigned int countToIndex(unsigned int count) const; -public: - SArrayLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit - virtual ~SArrayLockFreeQueue(); - inline bool isempty() const; - inline bool isfull() const; - virtual unsigned int size() const; - virtual bool add(const T &item); // add item to end - virtual bool add_nowait(const T &item); - virtual bool add_timeout(const T &item, struct timespec *timeout); - virtual bool remove(T &item); - virtual bool remove_nowait(T &item); - virtual bool remove_timeout(T &item, struct timespec * timeout); - - - virtual T& operator[](unsigned i); -}; - - - - -// Queue methods -template <typename T> -SArrayLockFreeQueue<T>::SArrayLockFreeQueue(size_t qs) : m_readIndex(0), m_writeIndex(0), m_maximumReadIndex(0), count(0), qsize(qs) -{ - - arr = (T*)mm_malloc(qsize * sizeof(T)); - slots = pcsem::init(IPC_PRIVATE, qsize); - items = pcsem::init(IPC_PRIVATE, 0); - -} - -template <typename T> -SArrayLockFreeQueue<T>::~SArrayLockFreeQueue() -{ - std::cerr << "SArrayLockFreeQueue destory" << std::endl; - pcsem::remove(slots); - pcsem::remove(items); - - mm_free(arr); - -} - -template <typename T> -inline bool SArrayLockFreeQueue<T>::isempty() const -{ - return countToIndex(m_readIndex.load(std::memory_order_relaxed)) == countToIndex( m_maximumReadIndex.load(std::memory_order_relaxed)); -} - -template <typename T> -inline bool SArrayLockFreeQueue<T>::isfull() const -{ - return countToIndex(m_maximumReadIndex.load(std::memory_order_relaxed) + 1) == countToIndex(m_readIndex.load(std::memory_order_relaxed)); - //return count == qsize; -} - -template <typename T> -unsigned int SArrayLockFreeQueue<T>::size() const -{ - return count; -} - -// Add item to queue -template <typename T> -bool SArrayLockFreeQueue<T>::_add(const T & item) -{ - - unsigned int currentReadIndex; - unsigned int currentWriteIndex; - - do - { - currentWriteIndex = m_writeIndex.load(std::memory_order_relaxed); - currentReadIndex = m_readIndex.load(std::memory_order_relaxed); - // if (countToIndex(currentWriteIndex + 1 ) == countToIndex(currentReadIndex)) - if (isfull()) - { - // the queue is full - return false; - } - - } while (!m_writeIndex.compare_exchange_weak(currentWriteIndex, (currentWriteIndex + 1), std::memory_order_release, std::memory_order_relaxed)); - - // We know now that this index is reserved for us. Use it to save the data - std::cerr << "add " << count.load( std::memory_order_relaxed) << std::endl; - arr[countToIndex(currentWriteIndex)] = item; - count++; - - // update the maximum read index after saving the data. It wouldn't fail if there is only one thread - // inserting in the queue. It might fail if there are more than 1 producer threads because this - // operation has to be done in the same order as the previous CAS - while (!m_maximumReadIndex.compare_exchange_weak(currentWriteIndex, (currentWriteIndex + 1), std::memory_order_release, std::memory_order_relaxed)) - { - // this is a good place to yield the thread in case there are more - // software threads than hardware processors and you have more - // than 1 producer thread - // have a look at sched_yield (POSIX.1b) - sched_yield(); - } - - return true; -} - -template <typename T> -bool SArrayLockFreeQueue<T>::add(const T & item) -{ - if (pcsem::dec(slots) == -1) { - err_exit(errno, "add"); - } - - if (SArrayLockFreeQueue<T>::_add(item)) { - pcsem::inc(items); - return true; - } - return false; - -} - -template <typename T> -bool SArrayLockFreeQueue<T>::add_nowait(const T & item) -{ - if (pcsem::dec_nowait(slots) == -1) { - if (errno == EAGAIN) - return false; - else - err_exit(errno, "add_nowait"); - } - - if (SArrayLockFreeQueue<T>::_add(item)) { - pcsem::inc(items); - return true; - } - return false; - -} - -template <typename T> -bool SArrayLockFreeQueue<T>::add_timeout(const T & item, struct timespec * timeout) -{ - if (pcsem::dec_timeout(slots, timeout) == -1) { - if (errno == EAGAIN) - return false; - else - err_exit(errno, "add_timeout"); - } - - if (SArrayLockFreeQueue<T>::_add(item)){ - pcsem::inc(items); - return true; - } - return false; - -} - - -// Place front item into item variable and remove from queue -template <typename T> -bool SArrayLockFreeQueue<T>::_remove(T & item) -{ - unsigned int currentMaximumReadIndex; - unsigned int currentReadIndex; - - do - { - // to ensure thread-safety when there is more than 1 producer thread - // a second index is defined (m_maximumReadIndex) - currentReadIndex = m_readIndex.load(std::memory_order_relaxed); - currentMaximumReadIndex = m_maximumReadIndex.load(std::memory_order_relaxed); - - if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) - { - // the queue is empty or - // a producer thread has allocate space in the queue but is - // waiting to commit the data into it - return false; - } - - // retrieve the data from the queue - item = arr[countToIndex(currentReadIndex)]; - - // try to perfrom now the CAS operation on the read index. If we succeed - // a_data already contains what m_readIndex pointed to before we - // increased it - if (m_readIndex.compare_exchange_weak(currentReadIndex, (currentReadIndex + 1), std::memory_order_release, std::memory_order_relaxed)) - { - count--; - return true; - } - - // it failed retrieving the element off the queue. Someone else must - // have read the element stored at countToIndex(currentReadIndex) - // before we could perform the CAS operation - - } while(1); // keep looping to try again! - - // Something went wrong. it shouldn't be possible to reach here - //assert(0); - - // Add this return statement to avoid compiler warnings - return false; -} - -template <typename T> -bool SArrayLockFreeQueue<T>::remove(T & item) -{ - if (pcsem::dec(items) == -1) { - err_exit(errno, "remove"); - } - - if (SArrayLockFreeQueue<T>::_remove(item)) { - pcsem::inc(slots); - return true; - } - return false; - -} - -template <typename T> -bool SArrayLockFreeQueue<T>::remove_nowait(T & item) -{ - if (pcsem::dec_nowait(items) == -1) { - if (errno == EAGAIN) - return false; - else - err_exit(errno, "remove_nowait"); - } - - if (SArrayLockFreeQueue<T>::_remove(item)) { - pcsem::inc(slots); - return true; - } - return false; - -} - -template <typename T> -bool SArrayLockFreeQueue<T>::remove_timeout(T & item, struct timespec * timeout) -{ - if (pcsem::dec_timeout(items, timeout) == -1) { - if (errno == EAGAIN) - return false; - else - err_exit(errno, "remove_timeout"); - } - - if (SArrayLockFreeQueue<T>::_remove(item)) { - pcsem::inc(slots); - return true; - } - return false; - -} - -template <class T> -inline unsigned int SArrayLockFreeQueue<T>::countToIndex(unsigned int _count) const{ - return _count % qsize; -} - -template <class T> -T& SArrayLockFreeQueue<T>::operator[](unsigned int i) -{ - if (i < 0 || i >= count) - { - std::cerr << "SArrayLockFreeQueue operator[] ,Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); - } - - - return arr[countToIndex( m_readIndex.load(std::memory_order_relaxed) + i)]; -} - - -#endif diff --git a/squeue/include/lock_free_queue.h b/squeue/include/lock_free_queue.h new file mode 100644 index 0000000..a800228 --- /dev/null +++ b/squeue/include/lock_free_queue.h @@ -0,0 +1,191 @@ +#ifndef _LOCK_FREE_QUEUE_H__ +#define _LOCK_FREE_QUEUE_H__ + +#include <stdint.h> // uint32_t +#include <atomic> +#include <usg_common.h> + +// default Queue size +#define LOCK_FREE_Q_DEFAULT_SIZE 65536 // (2^16) + +// define this macro if calls to "size" must return the real size of the +// queue. If it is undefined that function will try to take a snapshot of +// the queue, but returned value might be bogus +#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + +// forward declarations for default template values +// + +template <typename ELEM_T> +class ArrayLockFreeQueueMultipleProducers; + + +/// @brief Lock-free queue based on a circular array +/// No allocation of extra memory for the nodes handling is needed, but it has +/// to add extra overhead (extra CAS operation) when inserting to ensure the +/// thread-safety of the queue when the queue type is not +/// ArrayLockFreeQueueSingleProducer. +/// +/// examples of instantiation: +/// ArrayLockFreeQueue<int> q; // queue of ints of default size (65535 - 1) +/// // and defaulted to single producer +/// ArrayLockFreeQueue<int, 16> q; +/// // queue of ints of size (16 - 1) and +/// // defaulted to single producer +/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q; +/// // queue of ints of size (100 - 1) with support +/// // for multiple producers +/// +/// ELEM_T represents the type of elementes pushed and popped from the queue +/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1) +/// This number should be a power of 2 to ensure +/// indexes in the circular queue keep stable when the uint32_t +/// variable that holds the current position rolls over from FFFFFFFF +/// to 0. For instance +/// 2 -> 0x02 +/// 4 -> 0x04 +/// 8 -> 0x08 +/// 16 -> 0x10 +/// (...) +/// 1024 -> 0x400 +/// 2048 -> 0x800 +/// +/// if queue size is not defined as requested, let's say, for +/// instance 100, when current position is FFFFFFFF (4,294,967,295) +/// index in the circular array is 4,294,967,295 % 100 = 95. +/// When that value is incremented it will be set to 0, that is the +/// last 4 elements of the queue are not used when the counter rolls +/// over to 0 +/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and +/// ArrayLockFreeQueueMultipleProducers are supported (single producer +/// by default) +template < + typename ELEM_T, + template <typename T> class Q_TYPE = ArrayLockFreeQueueMultipleProducers > +class LockFreeQueue +{ +public: + /// @brief constructor of the class + LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); + + /// @brief destructor of the class. + /// Note it is not virtual since it is not expected to inherit from this + /// template + ~LockFreeQueue(); + + /// @brief returns the current number of items in the queue + /// It tries to take a snapshot of the size of the queue, but in busy environments + /// this function might return bogus values. + /// + /// If a reliable queue size must be kept you might want to have a look at + /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' + /// it enables a reliable size though it hits overall performance of the queue + /// (when the reliable size variable is on it's got an impact of about 20% in time) + inline uint32_t size(); + + /// @brief return true if the queue is full. False otherwise + /// It tries to take a snapshot of the size of the queue, but in busy + /// environments this function might return bogus values. See help in method + /// LockFreeQueue::size + inline bool full(); + + /// @brief push an element at the tail of the queue + /// @param the element to insert in the queue + /// Note that the element is not a pointer or a reference, so if you are using large data + /// structures to be inserted in the queue you should think of instantiate the template + /// of the queue as a pointer to that large structure + /// @return true if the element was inserted in the queue. False if the queue was full + inline bool push(const ELEM_T &a_data); + + /// @brief pop the element at the head of the queue + /// @param a reference where the element in the head of the queue will be saved to + /// Note that the a_data parameter might contain rubbish if the function returns false + /// @return true if the element was successfully extracted from the queue. False if the queue was empty + inline bool pop(ELEM_T &a_data); + +protected: + /// @brief the actual queue. methods are forwarded into the real + /// implementation + Q_TYPE<ELEM_T> m_qImpl; + +private: + /// @brief disable copy constructor declaring it private + LockFreeQueue<ELEM_T, Q_TYPE>(const LockFreeQueue<ELEM_T, Q_TYPE> &a_src); +}; + + + +/// @brief implementation of an array based lock free queue with support for +/// multiple producers +/// This class is prevented from being instantiated directly (all members and +/// methods are private). To instantiate a multiple producers lock free queue +/// you must use the ArrayLockFreeQueue fachade: +/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q; +template <typename ELEM_T> +class ArrayLockFreeQueueMultipleProducers +{ + // ArrayLockFreeQueue will be using this' private members + template < + typename ELEM_T_, + template <typename T> class Q_TYPE > + friend class LockFreeQueue; + +private: + /// @brief constructor of the class + ArrayLockFreeQueueMultipleProducers(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); + + virtual ~ArrayLockFreeQueueMultipleProducers(); + + inline uint32_t size(); + + inline bool full(); + + inline bool empty(); + + bool push(const ELEM_T &a_data); + + bool pop(ELEM_T &a_data); + + /// @brief calculate the index in the circular array that corresponds + /// to a particular "count" value + inline uint32_t countToIndex(uint32_t a_count); + +private: + size_t Q_SIZE; + /// @brief array to keep the elements + ELEM_T *m_theQueue; + + /// @brief where a new element will be inserted + std::atomic<uint32_t> m_writeIndex; + + /// @brief where the next element where be extracted from + std::atomic<uint32_t> m_readIndex; + + /// @brief maximum read index for multiple producer queues + /// If it's not the same as m_writeIndex it means + /// there are writes pending to be "committed" to the queue, that means, + /// the place for the data was reserved (the index in the array) but + /// data is still not in the queue, so the thread trying to read will have + /// to wait for those other threads to save the data into the queue + /// + /// note this is only used for multiple producers + std::atomic<uint32_t> m_maximumReadIndex; + +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + /// @brief number of elements in the queue + std::atomic<uint32_t> m_count; +#endif + static int m_reference; + +private: + /// @brief disable copy constructor declaring it private + ArrayLockFreeQueueMultipleProducers<ELEM_T>(const ArrayLockFreeQueueMultipleProducers<ELEM_T> &a_src); + + +}; + +// include implementation files +#include "lock_free_queue_impl.h" +#include "lock_free_queue_impl_multiple_producer.h" + +#endif // _LOCK_FREE_QUEUE_H__ diff --git a/squeue/include/lock_free_queue_impl.h b/squeue/include/lock_free_queue_impl.h new file mode 100644 index 0000000..f11a796 --- /dev/null +++ b/squeue/include/lock_free_queue_impl.h @@ -0,0 +1,53 @@ +#ifndef __LOCK_FREE_QUEUE_IMPL_H__ +#define __LOCK_FREE_QUEUE_IMPL_H__ + +#include <assert.h> // assert() + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize): + m_qImpl(qsize) +{ +} + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue() +{ +} + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +inline uint32_t LockFreeQueue<ELEM_T, Q_TYPE>::size() +{ + return m_qImpl.size(); +} + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Q_TYPE>::full() +{ + return m_qImpl.full(); +} + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Q_TYPE>::push(const ELEM_T &a_data) +{ + return m_qImpl.push(a_data); +} + +template < + typename ELEM_T, + template <typename T> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data) +{ + return m_qImpl.pop(a_data); +} + +#endif // __LOCK_FREE_QUEUE_IMPL_H__ diff --git a/squeue/include/lock_free_queue_impl_multiple_producer.h b/squeue/include/lock_free_queue_impl_multiple_producer.h new file mode 100644 index 0000000..e324fcb --- /dev/null +++ b/squeue/include/lock_free_queue_impl_multiple_producer.h @@ -0,0 +1,245 @@ +#ifndef __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ +#define __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ + +#include <assert.h> // assert() +#include <sched.h> // sched_yield() +#include "mm.h" +#include "pcsem.h" +template <typename ELEM_T> +int ArrayLockFreeQueueMultipleProducers<ELEM_T>::m_reference = 0; + +template <typename ELEM_T> +ArrayLockFreeQueueMultipleProducers<ELEM_T>::ArrayLockFreeQueueMultipleProducers(size_t qsize): + Q_SIZE(qsize), + m_writeIndex(0), // initialisation is not atomic + m_readIndex(0), // + m_maximumReadIndex(0) // +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + ,m_count(0) // +#endif +{ + m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T)); + m_reference++; + +} + +template <typename ELEM_T> +ArrayLockFreeQueueMultipleProducers<ELEM_T>::~ArrayLockFreeQueueMultipleProducers() +{ + std::cout << "destroy ArrayLockFreeQueueMultipleProducers\n"; + m_reference--; + if(m_reference == 0) { + mm_free(m_theQueue); + } + +} + +template <typename ELEM_T> +inline +uint32_t ArrayLockFreeQueueMultipleProducers<ELEM_T>::countToIndex(uint32_t a_count) +{ + // if Q_SIZE is a power of 2 this statement could be also written as + // return (a_count & (Q_SIZE - 1)); + return (a_count % Q_SIZE); +} + +template <typename ELEM_T> +inline +uint32_t ArrayLockFreeQueueMultipleProducers<ELEM_T>::size() +{ +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + return m_count.load(); +#else + + uint32_t currentWriteIndex = m_maximumReadIndex.load(); + uint32_t currentReadIndex = m_readIndex.load(); + + // let's think of a scenario where this function returns bogus data + // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run + // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 + // 2. afterwards this thread is preemted. While this thread is inactive 2 + // elements are inserted and removed from the queue, so m_maximumReadIndex + // is 5 and m_readIndex 4. Real size is still 1 + // 3. Now the current thread comes back from preemption and reads m_readIndex. + // currentReadIndex is 4 + // 4. currentReadIndex is bigger than currentWriteIndex, so + // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, + // it returns that the queue is almost full, when it is almost empty + // + if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) + { + return (currentWriteIndex - currentReadIndex); + } + else + { + return (Q_SIZE + currentWriteIndex - currentReadIndex); + } +#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE +} + +template <typename ELEM_T> +inline +bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::full() +{ +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + return (m_count.load() == (Q_SIZE)); +#else + + uint32_t currentWriteIndex = m_writeIndex; + uint32_t currentReadIndex = m_readIndex; + + if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) + { + // the queue is full + return true; + } + else + { + // not full! + return false; + } +#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE +} + +template <typename ELEM_T> +inline +bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::empty() +{ +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + return (m_count.load() == 0); +#else + + if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load())) + { + // the queue is full + return true; + } + else + { + // not full! + return false; + } +#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE +} + + +template <typename ELEM_T> +bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::push(const ELEM_T &a_data) +{ + uint32_t currentWriteIndex; + + do + { + currentWriteIndex = m_writeIndex.load(); +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + if (m_count.load() == Q_SIZE) { + return false; + } +#else + if (countToIndex(currentWriteIndex + 1) == countToIndex(m_readIndex.load())) + { + // the queue is full + return false; + } +#endif + + // There is more than one producer. Keep looping till this thread is able + // to allocate space for current piece of data + // + // using compare_exchange_strong because it isn't allowed to fail spuriously + // When the compare_exchange operation is in a loop the weak version + // will yield better performance on some platforms, but here we'd have to + // load m_writeIndex all over again + } while (!m_writeIndex.compare_exchange_strong( + currentWriteIndex, (currentWriteIndex + 1))); + + // Just made sure this index is reserved for this thread. + m_theQueue[countToIndex(currentWriteIndex)] = a_data; + + // update the maximum read index after saving the piece of data. It can't + // fail if there is only one thread inserting in the queue. It might fail + // if there is more than 1 producer thread because this operation has to + // be done in the same order as the previous CAS + // + // using compare_exchange_weak because they are allowed to fail spuriously + // (act as if *this != expected, even if they are equal), but when the + // compare_exchange operation is in a loop the weak version will yield + // better performance on some platforms. + while (!m_maximumReadIndex.compare_exchange_weak( + currentWriteIndex, (currentWriteIndex + 1))) + { + // this is a good place to yield the thread in case there are more + // software threads than hardware processors and you have more + // than 1 producer thread + // have a look at sched_yield (POSIX.1b) + //sched_yield(); + } + + // The value was successfully inserted into the queue +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + m_count.fetch_add(1); +#endif + + return true; +} + +template <typename ELEM_T> +bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::pop(ELEM_T &a_data) +{ + uint32_t currentReadIndex; + + do + { + currentReadIndex = m_readIndex.load(); + + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + if (m_count.load() == 0) { + return false; + } + #else + // to ensure thread-safety when there is more than 1 producer + // thread a second index is defined (m_maximumReadIndex) + if (countToIndex(currentReadIndex) == countToIndex(m_maximumReadIndex.load())) + { + // the queue is empty or + // a producer thread has allocate space in the queue but is + // waiting to commit the data into it + return false; + } + #endif + + // retrieve the data from the queue + a_data = m_theQueue[countToIndex(currentReadIndex)]; + + // try to perfrom now the CAS operation on the read index. If we succeed + // a_data already contains what m_readIndex pointed to before we + // increased it + if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1))) + { + // got here. The value was retrieved from the queue. Note that the + // data inside the m_queue array is not deleted nor reseted +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + m_count.fetch_sub(1); +#endif + return true; + } + + // it failed retrieving the element off the queue. Someone else must + // have read the element stored at countToIndex(currentReadIndex) + // before we could perform the CAS operation + + } while(1); // keep looping to try again! + + // Something went wrong. it shouldn't be possible to reach here + assert(0); + + // Add this return statement to avoid compiler warnings + return false; +} + +#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ diff --git a/squeue/include/QFactory.h b/squeue/include/queue_factory.h similarity index 68% rename from squeue/include/QFactory.h rename to squeue/include/queue_factory.h index 971b4c6..6c71d14 100644 --- a/squeue/include/QFactory.h +++ b/squeue/include/queue_factory.h @@ -3,13 +3,10 @@ #include "usg_common.h" #include "mm.h" #include "hashtable.h" -#include "SArrayLockFreeQueue.h" +#include "lock_free_queue.h" #include "SLinkedLockFreeQueue.h" -namespace QFactory{ - - - +namespace QueueFactory{ template <typename T> SLinkedLockFreeQueue<T>* createLinkedLockFreeQueue(int key, size_t size) { @@ -32,18 +29,18 @@ template <typename T> - SArrayLockFreeQueue<T>* createArrayLockFreeQueue(int key, size_t size) { + LockFreeQueue<T>* createArrayLockFreeQueue(int key, size_t size=16) { hashtable_t *hashtable; - SArrayLockFreeQueue<T> *queue; + LockFreeQueue<T> *queue; int first; first = mm_init(sizeof(hashtable_t), (void **)&hashtable); if (first) hashtable_init(hashtable);; - - if ((queue = (SArrayLockFreeQueue<T> *)hashtable_get(hashtable, key)) == NULL ) { - queue = new SArrayLockFreeQueue <T>(size); + //LockFreeQueue<int, 10000> q; + if ((queue = (LockFreeQueue<T> *)hashtable_get(hashtable, key)) == NULL ) { + queue = new LockFreeQueue<T>(size); hashtable_put(hashtable, key, (void *)queue); } @@ -52,8 +49,8 @@ template <typename T> - SAbstractQueue<T>* createQueue(int key, size_t size) { - return QFactory::createLinkedLockFreeQueue<T>(key, size); + LockFreeQueue<T>* createQueue(int key, size_t size) { + return QueueFactory::createArrayLockFreeQueue<T>(key, size); } } diff --git a/squeue/mm.h b/squeue/mm.h index 9365018..728099f 100644 --- a/squeue/mm.h +++ b/squeue/mm.h @@ -1,7 +1,7 @@ #ifndef MM_HDR_H #define MM_HDR_H /* Prevent accidental double inclusion */ -#include "usg_common.h" +#include <usg_common.h> #include "usg_typedef.h" #include <sys/sem.h> diff --git a/squeue/test b/squeue/test deleted file mode 100755 index e239973..0000000 --- a/squeue/test +++ /dev/null Binary files differ diff --git a/squeue/test.c b/squeue/test.c deleted file mode 100644 index 0b46fe1..0000000 --- a/squeue/test.c +++ /dev/null @@ -1,13 +0,0 @@ -#include "usg_common.h" -#include "usg_typedef.h" -#include "squeue.h" -#include "qfactory.h" - - -int main () { - int mutex = pcsem::init(8899, 1); - std::cout << mutex << std::endl; - mutex = pcsem::init(8899, 1); - std::cout << mutex << std::endl; - -} \ No newline at end of file diff --git a/test/Makefile b/test/Makefile index f20ab24..69a2668 100755 --- a/test/Makefile +++ b/test/Makefile @@ -1,53 +1,97 @@ -ROOT=.. -EXTRALIBS+= -INCLUDE+=-I. -I$(ROOT)/squeue/include -I$(ROOT)/squeue -I$(ROOT)/common/include -LIBCOMMON=$(ROOT)/common/libusgcommon.a -LIBSQUEUE=$(ROOT)/squeue/libsqueue.a -LDLIBS = -lpthread -PLATFORM=$(shell $(ROOT)/systype.sh) -include $(ROOT)/Make.defines.$(PLATFORM) +# ROOT=.. +# EXTRALIBS+= +# INCLUDE+=-I. -I$(ROOT)/squeue/include -I$(ROOT)/squeue -I$(ROOT)/common/include +# LIBCOMMON=$(ROOT)/common/libusgcommon.a +# LIBSQUEUE=$(ROOT)/squeue/libsqueue.a +# LDLIBS = -lpthread +# PLATFORM=$(shell $(ROOT)/systype.sh) +# include $(ROOT)/Make.defines.$(PLATFORM) -PROGS = test_queue productor consumer single_productor single_consumer +# PROGS = test_queue productor consumer single_productor single_consumer -all: $(PROGS) +# all: $(PROGS) + +# # test1: $(LIBCOMMON) + +# # 濡傛灉鍖匒 寮曠敤鍖匓锛� B 瑕佹斁鍦� A 鍚庨潰 +# # svshm_reader: binary_sems.c $(LIBSQUEUE) $(LIBCOMMON) + +# # svshm_writer: binary_sems.c $(LIBSQUEUE) $(LIBCOMMON) + +# test_queue: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON) + + +# productor: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON) + + +# consumer: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON) + + +# single_productor: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON) + +# single_consumer: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON) +# # test_lostdata: test.h $(LIBSQUEUE) $(LIBCOMMON) + +# # consumer_timeout: $(ROOT)/squeue/include/squeue.h test.h $(LIBSQUEUE) $(LIBCOMMON) + +# # productor_timeout: $(ROOT)/squeue/squeue.h $(LIBSQUEUE) $(LIBCOMMON) + +# # test_atomic: $(ROOT)/squeue/squeue.h $(LIBSQUEUE) $(LIBCOMMON) + +# clean: +# rm -f $(PROGS) $(TEMPFILES) *.o + + + +# +# Makefile for common library. +# +ROOT=.. +#LDLIBS+=-Wl,-rpath=$(ROOT)/common:$(ROOT)/lib/jsoncpp +# 寮�婧愬伐鍏峰寘璺緞 +LDDIR += -L$(ROOT)/squeue +# 寮�婧愬伐鍏峰寘 +LDLIBS += -lsqueue -lpthread + +INCLUDE += -I$(ROOT)/squeue/ -I$(ROOT)/squeue/include + +PLATFORM=$(shell $(ROOT)/systype.sh) +include $(ROOT)/Make.defines.$(PLATFORM) + + +PROGS = test_queue + + +build: $(PROGS) + + # test1: $(LIBCOMMON) # 濡傛灉鍖匒 寮曠敤鍖匓锛� B 瑕佹斁鍦� A 鍚庨潰 -# svshm_reader: binary_sems.c $(LIBSQUEUE) $(LIBCOMMON) + -# svshm_writer: binary_sems.c $(LIBSQUEUE) $(LIBCOMMON) - -test_queue: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON) +test_queue: test.h $(ROOT)/squeue/include/lock_free_queue.h -productor: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON) +productor: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h -consumer: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON) +consumer: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h -single_productor: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON) +single_productor: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h -single_consumer: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON) -# test_lostdata: test.h $(LIBSQUEUE) $(LIBCOMMON) - -# consumer_timeout: $(ROOT)/squeue/include/squeue.h test.h $(LIBSQUEUE) $(LIBCOMMON) - -# productor_timeout: $(ROOT)/squeue/squeue.h $(LIBSQUEUE) $(LIBCOMMON) - -# test_atomic: $(ROOT)/squeue/squeue.h $(LIBSQUEUE) $(LIBCOMMON) +single_consumer: test.h $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h clean: - rm -f $(PROGS) $(TEMPFILES) *.o + rm -f $(TEMPFILES) $(PROGS) -$(LIBCOMMON): - (cd $(ROOT)/common && $(MAKE)) $(LIBSQUEUE): (cd $(ROOT)/squeue && $(MAKE)) diff --git a/test/consumer b/test/consumer index 9fcbb55..6ce34b5 100755 --- a/test/consumer +++ b/test/consumer Binary files differ diff --git a/test/productor b/test/productor index 5d866e0..7607729 100755 --- a/test/productor +++ b/test/productor Binary files differ diff --git a/test/single_consumer b/test/single_consumer index 8b5d8d5..971bd82 100755 --- a/test/single_consumer +++ b/test/single_consumer Binary files differ diff --git a/test/single_productor b/test/single_productor index cd6ea65..d1a4e20 100755 --- a/test/single_productor +++ b/test/single_productor Binary files differ diff --git a/test/test.h b/test/test.h index b5e0a01..76aa358 100644 --- a/test/test.h +++ b/test/test.h @@ -1,8 +1,8 @@ #include "usg_common.h" #include "usg_typedef.h" -#include "SArrayLockFreeQueue.h" +#include "lock_free_queue.h" #include "SLinkedLockFreeQueue.h" -#include "QFactory.h" +#include "queue_factory.h" #include <pthread.h> #define NTHREADS 3 @@ -23,8 +23,9 @@ // 閿�姣佸叡浜唴瀛樺拰淇″彿 void destroy() { - SLinkedLockFreeQueue<struct Item> *queue = QFactory::createLinkedLockFreeQueue<struct Item> (1, 10); - queue->~SLinkedLockFreeQueue(); + LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (1, 16); + //queue->~LockFreeQueue(); + delete queue; mm_deinit(); diff --git a/test/test1 b/test/test1 deleted file mode 100755 index 824ec2c..0000000 --- a/test/test1 +++ /dev/null Binary files differ diff --git a/test/test1.c b/test/test1.c deleted file mode 100755 index 0e2799b..0000000 --- a/test/test1.c +++ /dev/null @@ -1,10 +0,0 @@ -#include <common.h> -int main() { - //int a = 1; - //CAS(a, 1, 2); - //err_msg(2, "hello"); - unsigned int i = 0; - while(i > 0) { - std::cout << "bug" << std::endl; - } -} diff --git a/test/test_queue b/test/test_queue index 1adc459..bcd2dd9 100755 --- a/test/test_queue +++ b/test/test_queue Binary files differ diff --git a/test/test_queue.c b/test/test_queue.c index 26c4dfa..6046902 100644 --- a/test/test_queue.c +++ b/test/test_queue.c @@ -1,39 +1,38 @@ #include "test.h" using namespace std; + int main () { unsigned int i = 0; - struct Item - { - int pic; - int info; - }; + struct Item item; - size_t qsize = 10; - SArrayLockFreeQueue<struct Item> *queue = QFactory::createArrayLockFreeQueue<struct Item> (1, qsize); + size_t qsize = 1; + LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (2, qsize); - + // LockFreeQueue<struct Item> queue(16); for(i = 0; i < qsize; i++) { - queue->add({i, i}); + if(queue->push({i, i})) { + cout << i << " push锛�" << i << endl; + } } - for(i = 0; i < qsize; i++) { + // for(i = 0; i < qsize; i++) { - //queue->dequeue(item); + // //queue.dequeue(item); - item = (*queue)[i]; - cout << "i=" << i << " item " << item.pic << "," << item.info << endl; - } + // item = (*queue)[i]; + // cout << "i=" << i << " item " << item.pic << "," << item.info << endl; + // } struct timespec timeout = {5, 0}; i = 0; - while((queue->remove_timeout(item, &timeout)) ) { + while((queue->pop(item)) ) { cout << i << " 鍑洪槦锛�" << item.pic << ", " << item.info << endl; // cout << item.pic << endl; -- Gitblit v1.8.0