wangzhengquan
2020-07-06 302ae4427b04a25e4f1ee8acadbb05bf902f47f7
update
9个文件已删除
6个文件已添加
2 文件已重命名
12个文件已修改
1445 ■■■■■ 已修改文件
Make.defines.linux 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Makefile 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/Makefile 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/include/usg_common.h 68 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/tmp 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/usg_common.c 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
include/usgcommon/logger.h 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
include/usgcommon/properties_config.h 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
include/usgcommon/usg_common.h 130 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
include/usgcommon/usg_typedef.h 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/Makefile 54 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/include/SArrayLockFreeQueue.h 305 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/include/lock_free_queue.h 191 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/include/lock_free_queue_impl.h 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/include/lock_free_queue_impl_multiple_producer.h 245 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/include/queue_factory.h 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/mm.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
squeue/test 补丁 | 查看 | 原始文档 | blame | 历史
squeue/test.c 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/Makefile 100 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/consumer 补丁 | 查看 | 原始文档 | blame | 历史
test/productor 补丁 | 查看 | 原始文档 | blame | 历史
test/single_consumer 补丁 | 查看 | 原始文档 | blame | 历史
test/single_productor 补丁 | 查看 | 原始文档 | blame | 历史
test/test.h 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test1 补丁 | 查看 | 原始文档 | blame | 历史
test/test1.c 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_queue 补丁 | 查看 | 原始文档 | blame | 历史
test/test_queue.c 29 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Make.defines.linux
@@ -7,14 +7,27 @@
COMPILE.c = $(CC) $(CFLAGS) $(CPPFLAGS) -c
LINK.c = $(CC) $(CFLAGS) $(CPPFLAGS) $(LDFLAGS)
LDFLAGS=
LDDIR += -L$(ROOT)/lib
LDLIBS += $(LDDIR) -lusgcommon $(EXTRALIBS)
INCLUDE += -I. -I./include -I$(ROOT)/include -I$(ROOT)/include/usgcommon
ifeq ($(DEBUG),y)
  DEBFLAGS = -O -g # "-O" is needed to expand inlines
else
  DEBFLAGS = -O2
endif
CFLAGS = $(INCLUDE) -g -std=c++11 -mcx16 -Wall  -DLINUX -D_GNU_SOURCE $(EXTRA)
CFLAGS += $(INCLUDE)  $(DEBFLAGS) -mcx16 -std=c++11 -Wall -DLINUX -D_GNU_SOURCE $(EXTRA)
RANLIB=echo
AR=ar
AWK=awk
LIBCOMMON=$(ROOT)/common/libusgcommon.a
# Common temp files to delete from each directory.
TEMPFILES=core core.* *.o temp.* *.out *.a *.so
%:    %.c $(LIBCOMMON)
    $(CC) $(CFLAGS) $(filter-out $(LIBCOMMON), $^) -o $@ $(LDFLAGS) $(LDLIBS)
Makefile
@@ -1,4 +1,4 @@
DIRS = common squeue test
DIRS = squeue test
all:
    for i in $(DIRS); do \
@@ -11,8 +11,8 @@
    done
ipcrm:
    - ipcrm -a
    - ipcrm -M 0x1234
    - ipcrm -S 145
    - ipcrm -S 146
    - ipcrm -S 8899
    -ipcrm -a
    -ipcrm -M 0x1234
    -ipcrm -S 145
    -ipcrm -S 146
    -ipcrm -S 8899
common/Makefile
File was deleted
common/include/usg_common.h
File was deleted
common/tmp
File was deleted
common/usg_common.c
File was deleted
include/usgcommon/logger.h
New file
@@ -0,0 +1,44 @@
#ifndef __LOGGER_H__
#define __LOGGER_H__
#include "usg_common.h"
#include "usg_typedef.h"
class Logger {
    std::string configFile;
    int level;
    void dolog(const char *fmt, va_list ap)
    {
        char    buf[MAXBUF];
        vsnprintf(buf, MAXBUF-1, fmt, ap);
        strcat(buf, "\n");
        fflush(stdout);        /* in case stdout and stderr are the same */
        fputs(buf, stderr);
        fflush(NULL);        /* flushes all stdio output streams */
    }
public:
    enum {
        ALL ,
        DEBUG ,
        INFO ,
        WARN ,
        ERROR ,
        FATAL ,
        OFF
    };
    Logger(int l = INFO): level(l) {}
    Logger(std::string cf);
    void log(int _level,  const char *fmt,  ...);
    void debug(const char *fmt, ...);
    void info(const char *fmt, ...);
    void warn(const char *fmt, ...);
    void error(const char *fmt, ...);
    void fatal(const char *fmt, ...);
};
#endif
include/usgcommon/properties_config.h
New file
@@ -0,0 +1,15 @@
#ifndef __PROPERTIES_CONFIG_H__
#define __PROPERTIES_CONFIG_H__
#include "usg_common.h"
class PropertiesConfig {
    std::string propertiesFile;
    std::map<std::string, std::string> propertiesMap;
public:
    PropertiesConfig(std::string _propertiesFile="./config.properties");
    std::string get(std::string name);
    int getInt(std::string name);
};
#endif
include/usgcommon/usg_common.h
New file
@@ -0,0 +1,130 @@
/*
 * Our own header, to be included before all standard system headers.
 */
#ifndef __USG_COMMON_H__
#define __USG_COMMON_H__
#ifdef __cplusplus
extern "C" {
#endif
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include <setjmp.h>
#include <signal.h>
#include <dirent.h>
#include <time.h>
#include <sched.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/file.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <errno.h>
#include <math.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <libgen.h>
/*
 * define int8_t uint8_t int16_t uint16_t int32_t uint32_t int64_t uint64_t
*/
#include <stdint.h>
#ifdef __cplusplus
}
#endif
//c++ header
#ifdef __cplusplus
#include <iostream>
#include <fstream>
#include <string>
#include <cstdlib>
#include <atomic>
#include <algorithm>
#include <iomanip>
#include <limits>
#include <map>
#include <initializer_list>
#include <vector>
#include <thread>
#endif
#ifdef __cplusplus
extern "C" {
#endif
/* Our own error-handling functions */
void err_exit(int error, const char *fmt, ...);
void err_msg(int error, const char *fmt, ...);
char *ltrim(char *str, const char *seps);
char *rtrim(char *str, const char *seps);
char *trim(char *str, const char *seps);
static inline int
itoa(int num, char *str)
{
    return sprintf(str, "%d", num);
}
static inline int
ftoa(float num, char *str)
{
     return sprintf(str, "%f", num);
}
#ifdef __cplusplus
}
#endif
#ifdef __cplusplus
// static inline std::string& ltrim(std::string& str, const std::string& chars = "\t\n\v\f\r ")
// {
//     str.erase(0, str.find_first_not_of(chars));
//     return str;
// }
// static inline std::string& rtrim(std::string& str, const std::string& chars = "\t\n\v\f\r ")
// {
//     str.erase(str.find_last_not_of(chars) + 1);
//     return str;
// }
// static inline std::string& trim(std::string& str, const std::string& chars = "\t\n\v\f\r ")
// {
//     return ltrim(rtrim(str, chars), chars);
// }
#endif
#endif
include/usgcommon/usg_typedef.h
File was renamed from common/include/usg_typedef.h
@@ -40,10 +40,13 @@
#define DIR_MODE  (FILE_MODE | S_IXUSR | S_IXGRP | S_IXOTH)
#define min(a,b)  ((a) < (b) ? (a) : (b))
#define max(a,b)  ((a) > (b) ? (a) : (b))
#define MIN(a,b)  ((a) < (b) ? (a) : (b))
#define MAX(a,b)  ((a) > (b) ? (a) : (b))
/* Misc constants */
#define    MAXLINE     1024  /* Max text line length */
#define MAXBUF   8192  /* Max I/O buffer size */
#ifdef __cplusplus
}
squeue/Makefile
@@ -2,35 +2,41 @@
# Makefile for common library.
#
ROOT=..
PLATFORM=$(shell $(ROOT)/systype.sh)
INCLUDE+=-I./ -I./include -I$(ROOT)/common/include
LIBCOMMON=$(ROOT)/common/libusgcommon.a
LIBSQUEUE = libsqueue.a
DLIBSQUEUE = libsqueue.so
LDLIBS =-L$(ROOT)/common -lusgcommon $(EXTRALIBS)
#LDLIBS+=-Wl,-rpath=$(ROOT)/common:$(ROOT)/lib/jsoncpp
# 开源工具包路径
#LDDIR += -L$(ROOT)/lib/jsoncpp -L$(ROOT)/lib/nng
# 开源工具包
#LDLIBS += -ljsoncpp  -lnng  -lpthread
#INCLUDE += -I$(ROOT)/device/include
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
LIBSQUEUE = libsqueue.a
DLIBSQUEUE = libsqueue.so
OBJS = mm.o pcsem.o hashtable.o
SOURCES := $(wildcard *.c)
OBJS   = $(patsubst %.c, %.o, $(SOURCES))
all: $(LIBSQUEUE)
MYLIBS = $(LIBSQUEUE)
mm.o:  $(LIBCOMMON)
all: build
pcsem.o: $(LIBCOMMON)
test: $(OBJS) $(LIBCOMMON)
build: $(MYLIBS)
#static lib
$(LIBSQUEUE): $(OBJS) $(LIBCOMMON)
$(LIBSQUEUE): $(OBJS)
    $(AR) rv $@ $?
    $(RANLIB) $@
#dynamic lib
$(DLIBSQUEUE):    $(OBJS) $(LIBCOMMON)
    $(CC) -fPIC -shared -o $@ $^
$(DLIBSQUEUE): $(SOURCES)
    rm -f *.o
    $(CC) -fPIC -shared $(CFLAGS) $^ -o $@ $(LDFLAGS)
#PREFIX is environment variable, but if it is not set, then set default value
ifeq ($(PREFIX),)
@@ -38,15 +44,13 @@
endif
# 使用方式: g++ test1.c  -lcommon
install: $(DLIBSQUEUE) $(LIBSQUEUE)
    sudo install -d $(PREFIX)/lib/
    sudo install -m 644 $^ $(PREFIX)/lib/
    sudo install -d $(PREFIX)/include/
    sudo install -m 644 include/* $(PREFIX)/include/
install: $(MYLIBS)
    install -d $(PREFIX)/lib/
    install -m 644 $^ $(PREFIX)/lib/
    install -d $(PREFIX)/include/
    install -m 644 include/* $(PREFIX)/include/
clean:
    rm -f *.o a.out core temp.* *.a *.so
    rm -f $(TEMPFILES)
include $(ROOT)/Make.common.inc
.PHONY: build clean install
squeue/include/SArrayLockFreeQueue.h
File was deleted
squeue/include/lock_free_queue.h
New file
@@ -0,0 +1,191 @@
#ifndef _LOCK_FREE_QUEUE_H__
#define _LOCK_FREE_QUEUE_H__
#include <stdint.h>     // uint32_t
#include <atomic>
#include <usg_common.h>
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 65536 // (2^16)
// define this macro if calls to "size" must return the real size of the
// queue. If it is undefined  that function will try to take a snapshot of
// the queue, but returned value might be bogus
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
// forward declarations for default template values
//
template <typename ELEM_T>
class ArrayLockFreeQueueMultipleProducers;
/// @brief Lock-free queue based on a circular array
/// No allocation of extra memory for the nodes handling is needed, but it has
/// to add extra overhead (extra CAS operation) when inserting to ensure the
/// thread-safety of the queue when the queue type is not
/// ArrayLockFreeQueueSingleProducer.
///
/// examples of instantiation:
///   ArrayLockFreeQueue<int> q; // queue of ints of default size (65535 - 1)
///                              // and defaulted to single producer
///   ArrayLockFreeQueue<int, 16> q;
///                              // queue of ints of size (16 - 1) and
///                              // defaulted to single producer
///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q;
///                              // queue of ints of size (100 - 1) with support
///                              // for multiple producers
///
/// ELEM_T represents the type of elementes pushed and popped from the queue
/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
///        This number should be a power of 2 to ensure
///        indexes in the circular queue keep stable when the uint32_t
///        variable that holds the current position rolls over from FFFFFFFF
///        to 0. For instance
///        2    -> 0x02
///        4    -> 0x04
///        8    -> 0x08
///        16   -> 0x10
///        (...)
///        1024 -> 0x400
///        2048 -> 0x800
///
///        if queue size is not defined as requested, let's say, for
///        instance 100, when current position is FFFFFFFF (4,294,967,295)
///        index in the circular array is 4,294,967,295 % 100 = 95.
///        When that value is incremented it will be set to 0, that is the
///        last 4 elements of the queue are not used when the counter rolls
///        over to 0
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
///        ArrayLockFreeQueueMultipleProducers are supported (single producer
///        by default)
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE = ArrayLockFreeQueueMultipleProducers >
class LockFreeQueue
{
public:
    /// @brief constructor of the class
    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    /// @brief destructor of the class.
    /// Note it is not virtual since it is not expected to inherit from this
    /// template
    ~LockFreeQueue();
    /// @brief returns the current number of items in the queue
    /// It tries to take a snapshot of the size of the queue, but in busy environments
    /// this function might return bogus values.
    ///
    /// If a reliable queue size must be kept you might want to have a look at
    /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
    /// it enables a reliable size though it hits overall performance of the queue
    /// (when the reliable size variable is on it's got an impact of about 20% in time)
    inline uint32_t size();
    /// @brief return true if the queue is full. False otherwise
    /// It tries to take a snapshot of the size of the queue, but in busy
    /// environments this function might return bogus values. See help in method
    /// LockFreeQueue::size
    inline bool full();
    /// @brief push an element at the tail of the queue
    /// @param the element to insert in the queue
    /// Note that the element is not a pointer or a reference, so if you are using large data
    /// structures to be inserted in the queue you should think of instantiate the template
    /// of the queue as a pointer to that large structure
    /// @return true if the element was inserted in the queue. False if the queue was full
    inline bool push(const ELEM_T &a_data);
    /// @brief pop the element at the head of the queue
    /// @param a reference where the element in the head of the queue will be saved to
    /// Note that the a_data parameter might contain rubbish if the function returns false
    /// @return true if the element was successfully extracted from the queue. False if the queue was empty
    inline bool pop(ELEM_T &a_data);
protected:
    /// @brief the actual queue. methods are forwarded into the real
    ///        implementation
    Q_TYPE<ELEM_T> m_qImpl;
private:
    /// @brief disable copy constructor declaring it private
    LockFreeQueue<ELEM_T, Q_TYPE>(const LockFreeQueue<ELEM_T, Q_TYPE> &a_src);
};
/// @brief implementation of an array based lock free queue with support for
///        multiple producers
/// This class is prevented from being instantiated directly (all members and
/// methods are private). To instantiate a multiple producers lock free queue
/// you must use the ArrayLockFreeQueue fachade:
///   ArrayLockFreeQueue<int, 100, ArrayLockFreeQueueMultipleProducers> q;
template <typename ELEM_T>
class ArrayLockFreeQueueMultipleProducers
{
    // ArrayLockFreeQueue will be using this' private members
    template <
        typename ELEM_T_,
        template <typename T> class Q_TYPE >
    friend class LockFreeQueue;
private:
    /// @brief constructor of the class
    ArrayLockFreeQueueMultipleProducers(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    virtual ~ArrayLockFreeQueueMultipleProducers();
    inline uint32_t size();
    inline bool full();
    inline bool empty();
    bool push(const ELEM_T &a_data);
    bool pop(ELEM_T &a_data);
    /// @brief calculate the index in the circular array that corresponds
    /// to a particular "count" value
    inline uint32_t countToIndex(uint32_t a_count);
private:
    size_t Q_SIZE;
    /// @brief array to keep the elements
    ELEM_T *m_theQueue;
    /// @brief where a new element will be inserted
    std::atomic<uint32_t> m_writeIndex;
    /// @brief where the next element where be extracted from
    std::atomic<uint32_t> m_readIndex;
    /// @brief maximum read index for multiple producer queues
    /// If it's not the same as m_writeIndex it means
    /// there are writes pending to be "committed" to the queue, that means,
    /// the place for the data was reserved (the index in the array) but
    /// data is still not in the queue, so the thread trying to read will have
    /// to wait for those other threads to save the data into the queue
    ///
    /// note this is only used for multiple producers
    std::atomic<uint32_t> m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    /// @brief number of elements in the queue
    std::atomic<uint32_t> m_count;
#endif
    static int m_reference;
private:
    /// @brief disable copy constructor declaring it private
    ArrayLockFreeQueueMultipleProducers<ELEM_T>(const ArrayLockFreeQueueMultipleProducers<ELEM_T> &a_src);
};
// include implementation files
#include "lock_free_queue_impl.h"
#include "lock_free_queue_impl_multiple_producer.h"
#endif // _LOCK_FREE_QUEUE_H__
squeue/include/lock_free_queue_impl.h
New file
@@ -0,0 +1,53 @@
#ifndef __LOCK_FREE_QUEUE_IMPL_H__
#define __LOCK_FREE_QUEUE_IMPL_H__
#include <assert.h> // assert()
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
LockFreeQueue<ELEM_T, Q_TYPE>::LockFreeQueue(size_t qsize):
    m_qImpl(qsize)
{
}
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
LockFreeQueue<ELEM_T, Q_TYPE>::~LockFreeQueue()
{
}
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Q_TYPE>::size()
{
    return m_qImpl.size();
}
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Q_TYPE>::full()
{
    return m_qImpl.full();
}
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Q_TYPE>::push(const ELEM_T &a_data)
{
    return m_qImpl.push(a_data);
}
template <
    typename ELEM_T,
    template <typename T> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Q_TYPE>::pop(ELEM_T &a_data)
{
    return m_qImpl.pop(a_data);
}
#endif // __LOCK_FREE_QUEUE_IMPL_H__
squeue/include/lock_free_queue_impl_multiple_producer.h
New file
@@ -0,0 +1,245 @@
#ifndef __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#define __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
#include "mm.h"
#include "pcsem.h"
template <typename ELEM_T>
int ArrayLockFreeQueueMultipleProducers<ELEM_T>::m_reference = 0;
template <typename ELEM_T>
ArrayLockFreeQueueMultipleProducers<ELEM_T>::ArrayLockFreeQueueMultipleProducers(size_t qsize):
    Q_SIZE(qsize),
    m_writeIndex(0),      // initialisation is not atomic
    m_readIndex(0),       //
    m_maximumReadIndex(0) //
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    ,m_count(0)           //
#endif
{
    m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T));
    m_reference++;
}
template <typename ELEM_T>
ArrayLockFreeQueueMultipleProducers<ELEM_T>::~ArrayLockFreeQueueMultipleProducers()
{
    std::cout << "destroy ArrayLockFreeQueueMultipleProducers\n";
    m_reference--;
    if(m_reference == 0) {
       mm_free(m_theQueue);
    }
}
template <typename ELEM_T>
inline
uint32_t ArrayLockFreeQueueMultipleProducers<ELEM_T>::countToIndex(uint32_t a_count)
{
    // if Q_SIZE is a power of 2 this statement could be also written as
    // return (a_count & (Q_SIZE - 1));
    return (a_count % Q_SIZE);
}
template <typename ELEM_T>
inline
uint32_t ArrayLockFreeQueueMultipleProducers<ELEM_T>::size()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return m_count.load();
#else
    uint32_t currentWriteIndex = m_maximumReadIndex.load();
    uint32_t currentReadIndex  = m_readIndex.load();
    // let's think of a scenario where this function returns bogus data
    // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
    // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
    // 2. afterwards this thread is preemted. While this thread is inactive 2
    // elements are inserted and removed from the queue, so m_maximumReadIndex
    // is 5 and m_readIndex 4. Real size is still 1
    // 3. Now the current thread comes back from preemption and reads m_readIndex.
    // currentReadIndex is 4
    // 4. currentReadIndex is bigger than currentWriteIndex, so
    // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
    // it returns that the queue is almost full, when it is almost empty
    //
    if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
    {
        return (currentWriteIndex - currentReadIndex);
    }
    else
    {
        return (Q_SIZE + currentWriteIndex - currentReadIndex);
    }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T>
inline
bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::full()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count.load() == (Q_SIZE));
#else
    uint32_t currentWriteIndex = m_writeIndex;
    uint32_t currentReadIndex  = m_readIndex;
    if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
    {
        // the queue is full
        return true;
    }
    else
    {
        // not full!
        return false;
    }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T>
inline
bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::empty()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count.load() == 0);
#else
    if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load()))
    {
        // the queue is full
        return true;
    }
    else
    {
        // not full!
        return false;
    }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T>
bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::push(const ELEM_T &a_data)
{
    uint32_t currentWriteIndex;
    do
    {
        currentWriteIndex = m_writeIndex.load();
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count.load() == Q_SIZE) {
            return false;
        }
#else
        if (countToIndex(currentWriteIndex + 1) == countToIndex(m_readIndex.load()))
        {
            // the queue is full
            return false;
        }
#endif
    // There is more than one producer. Keep looping till this thread is able
    // to allocate space for current piece of data
    //
    // using compare_exchange_strong because it isn't allowed to fail spuriously
    // When the compare_exchange operation is in a loop the weak version
    // will yield better performance on some platforms, but here we'd have to
    // load m_writeIndex all over again
    } while (!m_writeIndex.compare_exchange_strong(
                currentWriteIndex, (currentWriteIndex + 1)));
    // Just made sure this index is reserved for this thread.
    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
    // update the maximum read index after saving the piece of data. It can't
    // fail if there is only one thread inserting in the queue. It might fail
    // if there is more than 1 producer thread because this operation has to
    // be done in the same order as the previous CAS
    //
    // using compare_exchange_weak because they are allowed to fail spuriously
    // (act as if *this != expected, even if they are equal), but when the
    // compare_exchange operation is in a loop the weak version will yield
    // better performance on some platforms.
    while (!m_maximumReadIndex.compare_exchange_weak(
                currentWriteIndex, (currentWriteIndex + 1)))
    {
        // this is a good place to yield the thread in case there are more
        // software threads than hardware processors and you have more
        // than 1 producer thread
        // have a look at sched_yield (POSIX.1b)
        //sched_yield();
    }
    // The value was successfully inserted into the queue
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    m_count.fetch_add(1);
#endif
    return true;
}
template <typename ELEM_T>
bool ArrayLockFreeQueueMultipleProducers<ELEM_T>::pop(ELEM_T &a_data)
{
    uint32_t currentReadIndex;
    do
    {
        currentReadIndex = m_readIndex.load();
     #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count.load() == 0) {
            return false;
        }
    #else
        // to ensure thread-safety when there is more than 1 producer
        // thread a second index is defined (m_maximumReadIndex)
        if (countToIndex(currentReadIndex) == countToIndex(m_maximumReadIndex.load()))
        {
            // the queue is empty or
            // a producer thread has allocate space in the queue but is
            // waiting to commit the data into it
            return false;
        }
    #endif
        // retrieve the data from the queue
        a_data = m_theQueue[countToIndex(currentReadIndex)];
        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we
        // increased it
        if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1)))
        {
            // got here. The value was retrieved from the queue. Note that the
            // data inside the m_queue array is not deleted nor reseted
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
            m_count.fetch_sub(1);
#endif
            return true;
        }
        // it failed retrieving the element off the queue. Someone else must
        // have read the element stored at countToIndex(currentReadIndex)
        // before we could perform the CAS operation
    } while(1); // keep looping to try again!
    // Something went wrong. it shouldn't be possible to reach here
    assert(0);
    // Add this return statement to avoid compiler warnings
    return false;
}
#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
squeue/include/queue_factory.h
File was renamed from squeue/include/QFactory.h
@@ -3,13 +3,10 @@
#include "usg_common.h"
#include "mm.h"
#include "hashtable.h"
#include "SArrayLockFreeQueue.h"
#include "lock_free_queue.h"
#include "SLinkedLockFreeQueue.h"
namespace QFactory{
namespace QueueFactory{
    template <typename T>
    SLinkedLockFreeQueue<T>* createLinkedLockFreeQueue(int key, size_t size) {
@@ -32,18 +29,18 @@
    template <typename T>
    SArrayLockFreeQueue<T>* createArrayLockFreeQueue(int key, size_t size) {
    LockFreeQueue<T>* createArrayLockFreeQueue(int key, size_t size=16) {
        hashtable_t *hashtable;
        SArrayLockFreeQueue<T> *queue;
        LockFreeQueue<T> *queue;
        int first;
          
        first = mm_init(sizeof(hashtable_t), (void **)&hashtable);
          
        if (first)
           hashtable_init(hashtable);;
        if ((queue = (SArrayLockFreeQueue<T> *)hashtable_get(hashtable, key)) == NULL ) {
            queue = new SArrayLockFreeQueue <T>(size);
        //LockFreeQueue<int, 10000> q;
        if ((queue = (LockFreeQueue<T> *)hashtable_get(hashtable, key)) == NULL ) {
            queue = new LockFreeQueue<T>(size);
            hashtable_put(hashtable,  key, (void *)queue);
        }
@@ -52,8 +49,8 @@
    template <typename T>
    SAbstractQueue<T>* createQueue(int key, size_t size) {
        return QFactory::createLinkedLockFreeQueue<T>(key, size);
    LockFreeQueue<T>* createQueue(int key, size_t size) {
        return QueueFactory::createArrayLockFreeQueue<T>(key, size);
    }
}
squeue/mm.h
@@ -1,7 +1,7 @@
#ifndef MM_HDR_H
#define MM_HDR_H      /* Prevent accidental double inclusion */
#include "usg_common.h"
#include <usg_common.h>
#include "usg_typedef.h"
#include <sys/sem.h>
squeue/test
Binary files differ
squeue/test.c
File was deleted
test/Makefile
@@ -1,53 +1,97 @@
ROOT=..
EXTRALIBS+=
INCLUDE+=-I. -I$(ROOT)/squeue/include -I$(ROOT)/squeue -I$(ROOT)/common/include
LIBCOMMON=$(ROOT)/common/libusgcommon.a
LIBSQUEUE=$(ROOT)/squeue/libsqueue.a
LDLIBS = -lpthread
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
# ROOT=..
# EXTRALIBS+=
# INCLUDE+=-I. -I$(ROOT)/squeue/include -I$(ROOT)/squeue -I$(ROOT)/common/include
# LIBCOMMON=$(ROOT)/common/libusgcommon.a
# LIBSQUEUE=$(ROOT)/squeue/libsqueue.a
# LDLIBS = -lpthread
# PLATFORM=$(shell $(ROOT)/systype.sh)
# include $(ROOT)/Make.defines.$(PLATFORM)
PROGS =    test_queue productor consumer single_productor single_consumer
# PROGS =    test_queue productor consumer single_productor single_consumer
         
    
all: $(PROGS)
# all: $(PROGS)
# # test1: $(LIBCOMMON)
# # 如果包A 引用包B, B 要放在 A 后面
# # svshm_reader:  binary_sems.c  $(LIBSQUEUE) $(LIBCOMMON)
# # svshm_writer:  binary_sems.c  $(LIBSQUEUE) $(LIBCOMMON)
# test_queue: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h  $(ROOT)/squeue/include/QFactory.h  $(LIBSQUEUE) $(LIBCOMMON)
# productor: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h  $(LIBSQUEUE) $(LIBCOMMON)
# consumer: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON)
# single_productor: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h  $(LIBSQUEUE) $(LIBCOMMON)
# single_consumer: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON)
# # test_lostdata: test.h  $(LIBSQUEUE) $(LIBCOMMON)
# # consumer_timeout: $(ROOT)/squeue/include/squeue.h test.h  $(LIBSQUEUE) $(LIBCOMMON)
# # productor_timeout: $(ROOT)/squeue/squeue.h $(LIBSQUEUE) $(LIBCOMMON)
# # test_atomic: $(ROOT)/squeue/squeue.h $(LIBSQUEUE) $(LIBCOMMON)
# clean:
#     rm -f $(PROGS) $(TEMPFILES) *.o
#
# Makefile for common library.
#
ROOT=..
#LDLIBS+=-Wl,-rpath=$(ROOT)/common:$(ROOT)/lib/jsoncpp
# 开源工具包路径
LDDIR += -L$(ROOT)/squeue
# 开源工具包
LDLIBS += -lsqueue -lpthread
INCLUDE += -I$(ROOT)/squeue/ -I$(ROOT)/squeue/include
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
PROGS =    test_queue
build: $(PROGS)
# test1: $(LIBCOMMON)
# 如果包A 引用包B, B 要放在 A 后面
# svshm_reader:  binary_sems.c  $(LIBSQUEUE) $(LIBCOMMON)
# svshm_writer:  binary_sems.c  $(LIBSQUEUE) $(LIBCOMMON)
test_queue: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h  $(ROOT)/squeue/include/QFactory.h  $(LIBSQUEUE) $(LIBCOMMON)
test_queue: test.h  $(ROOT)/squeue/include/lock_free_queue.h
productor: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h  $(LIBSQUEUE) $(LIBCOMMON)
productor: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h
consumer: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON)
consumer: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h
single_productor: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h  $(LIBSQUEUE) $(LIBCOMMON)
single_productor: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h
single_consumer: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h $(LIBSQUEUE) $(LIBCOMMON)
# test_lostdata: test.h  $(LIBSQUEUE) $(LIBCOMMON)
# consumer_timeout: $(ROOT)/squeue/include/squeue.h test.h  $(LIBSQUEUE) $(LIBCOMMON)
# productor_timeout: $(ROOT)/squeue/squeue.h $(LIBSQUEUE) $(LIBCOMMON)
# test_atomic: $(ROOT)/squeue/squeue.h $(LIBSQUEUE) $(LIBCOMMON)
single_consumer: test.h  $(ROOT)/squeue/include/SArrayLockFreeQueue.h $(ROOT)/squeue/include/SLinkedLockFreeQueue.h $(ROOT)/squeue/include/QFactory.h
clean:
    rm -f $(PROGS) $(TEMPFILES) *.o
    rm -f $(TEMPFILES) $(PROGS)
$(LIBCOMMON):
    (cd $(ROOT)/common && $(MAKE))
$(LIBSQUEUE):
    (cd $(ROOT)/squeue && $(MAKE))
test/consumer
Binary files differ
test/productor
Binary files differ
test/single_consumer
Binary files differ
test/single_productor
Binary files differ
test/test.h
@@ -1,8 +1,8 @@
#include "usg_common.h"
#include "usg_typedef.h"
#include "SArrayLockFreeQueue.h"
#include "lock_free_queue.h"
#include "SLinkedLockFreeQueue.h"
#include "QFactory.h"
#include "queue_factory.h"
 #include <pthread.h>
#define NTHREADS 3
@@ -23,8 +23,9 @@
// 销毁共享内存和信号
void destroy() {
    
    SLinkedLockFreeQueue<struct Item> *queue = QFactory::createLinkedLockFreeQueue<struct Item> (1, 10);
    queue->~SLinkedLockFreeQueue();
    LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (1, 16);
   //queue->~LockFreeQueue();
    delete queue;
    mm_deinit();
    
test/test1
Binary files differ
test/test1.c
File was deleted
test/test_queue
Binary files differ
test/test_queue.c
@@ -1,39 +1,38 @@
#include "test.h"
using namespace std;
int main () {
    unsigned int i = 0;
    struct Item
    {
        int pic;
        int info;
    };
     
    struct Item item;
    size_t qsize = 10;
      SArrayLockFreeQueue<struct Item> *queue = QFactory::createArrayLockFreeQueue<struct Item> (1, qsize);
    size_t qsize = 1;
      LockFreeQueue<struct Item> *queue = QueueFactory::createQueue<struct Item> (2, qsize);
    // LockFreeQueue<struct Item> queue(16);
    for(i = 0; i < qsize; i++) {
        queue->add({i, i});
        if(queue->push({i, i})) {
             cout << i << " push:" << i << endl;
        }
    }
    for(i = 0; i < qsize; i++) {
    // for(i = 0; i < qsize; i++) {
        
        //queue->dequeue(item);
    //     //queue.dequeue(item);
        
        item = (*queue)[i];
        cout << "i=" << i << " item " << item.pic << "," << item.info << endl;
    }
    //     item = (*queue)[i];
    //     cout << "i=" << i << " item " << item.pic << "," << item.info << endl;
    // }
     
    struct timespec timeout = {5, 0};
    i = 0;
    while((queue->remove_timeout(item, &timeout)) ) {
    while((queue->pop(item)) ) {
        cout << i << " 出队:" << item.pic << ", " << item.info << endl;
       // cout <<  item.pic << endl;