#ifndef __LOCK_FREE_QUEUE_H__
|
#define __LOCK_FREE_QUEUE_H__
|
|
#include <usg_common.h>
|
#include <assert.h> // assert()
|
#include "mem_pool.h"
|
#include "sem_util.h"
|
#include "logger_factory.h"
|
#include "shm_allocator.h"
|
|
// default Queue size
|
#define LOCK_FREE_Q_DEFAULT_SIZE 16
|
|
// define this macro if calls to "size" must return the real size of the
|
// queue. If it is undefined that function will try to take a snapshot of
|
// the queue, but returned value might be bogus
|
|
|
// forward declarations for default template values
|
//
|
|
template <typename ELEM_T, typename Allocator>
|
class ArrayLockFreeQueue;
|
|
// template <typename ELEM_T>
|
// class LinkedLockFreeQueue;
|
|
|
/// @brief Lock-free queue based on a circular array
|
/// No allocation of extra memory for the nodes handling is needed, but it has
|
/// to add extra overhead (extra CAS operation) when inserting to ensure the
|
/// thread-safety of the queue when the queue type is not
|
/// ArrayLockFreeQueueSingleProducer.
|
///
|
/// examples of instantiation:
|
/// ArrayLockFreeQueue<int> q; // queue of ints of default size (65535 - 1)
|
/// // and defaulted to single producer
|
/// ArrayLockFreeQueue<int, 16> q;
|
/// // queue of ints of size (16 - 1) and
|
/// // defaulted to single producer
|
/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
|
/// // queue of ints of size (100 - 1) with support
|
/// // for multiple producers
|
///
|
/// ELEM_T represents the type of elementes pushed and popped from the queue
|
/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
|
/// This number should be a power of 2 to ensure
|
/// indexes in the circular queue keep stable when the uint32_t
|
/// variable that holds the current position rolls over from FFFFFFFF
|
/// to 0. For instance
|
/// 2 -> 0x02
|
/// 4 -> 0x04
|
/// 8 -> 0x08
|
/// 16 -> 0x10
|
/// (...)
|
/// 1024 -> 0x400
|
/// 2048 -> 0x800
|
///
|
/// if queue size is not defined as requested, let's say, for
|
/// instance 100, when current position is FFFFFFFF (4,294,967,295)
|
/// index in the circular array is 4,294,967,295 % 100 = 95.
|
/// When that value is incremented it will be set to 0, that is the
|
/// last 4 elements of the queue are not used when the counter rolls
|
/// over to 0
|
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
|
/// ArrayLockFreeQueue are supported (single producer
|
/// by default)
|
template <
|
typename ELEM_T,
|
typename Allocator = SHM_Allocator,
|
template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
|
>
|
class LockFreeQueue
|
{
|
|
private:
|
int slots;
|
int items;
|
|
public:
|
int mutex;
|
LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
|
|
/// @brief destructor of the class.
|
/// Note it is not virtual since it is not expected to inherit from this
|
/// template
|
~LockFreeQueue();
|
std::atomic_uint reference;
|
/// @brief constructor of the class
|
|
|
/// @brief returns the current number of items in the queue
|
/// It tries to take a snapshot of the size of the queue, but in busy environments
|
/// this function might return bogus values.
|
///
|
/// If a reliable queue size must be kept you might want to have a look at
|
/// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
|
/// it enables a reliable size though it hits overall performance of the queue
|
/// (when the reliable size variable is on it's got an impact of about 20% in time)
|
inline uint32_t size();
|
|
/// @brief return true if the queue is full. False otherwise
|
/// It tries to take a snapshot of the size of the queue, but in busy
|
/// environments this function might return bogus values. See help in method
|
/// LockFreeQueue::size
|
inline bool full();
|
|
inline bool empty();
|
|
inline ELEM_T& operator[](unsigned i);
|
|
/// @brief push an element at the tail of the queue
|
/// @param the element to insert in the queue
|
/// Note that the element is not a pointer or a reference, so if you are using large data
|
/// structures to be inserted in the queue you should think of instantiate the template
|
/// of the queue as a pointer to that large structure
|
/// @return true if the element was inserted in the queue. False if the queue was full
|
bool push(const ELEM_T &a_data);
|
bool push_nowait(const ELEM_T &a_data);
|
bool push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
|
|
/// @brief pop the element at the head of the queue
|
/// @param a reference where the element in the head of the queue will be saved to
|
/// Note that the a_data parameter might contain rubbish if the function returns false
|
/// @return true if the element was successfully extracted from the queue. False if the queue was empty
|
bool pop(ELEM_T &a_data);
|
bool pop_nowait(ELEM_T &a_data);
|
bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
|
|
|
void *operator new(size_t size);
|
void operator delete(void *p);
|
|
protected:
|
/// @brief the actual queue. methods are forwarded into the real
|
/// implementation
|
Q_TYPE<ELEM_T, Allocator> m_qImpl;
|
|
private:
|
/// @brief disable copy constructor declaring it private
|
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
|
};
|
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
|
{
|
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
|
slots = SemUtil::get(IPC_PRIVATE, qsize);
|
items = SemUtil::get(IPC_PRIVATE, 0);
|
mutex = SemUtil::get(IPC_PRIVATE, 1);
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
|
{
|
LoggerFactory::getLogger().debug("LockFreeQueue desctroy");
|
SemUtil::remove(slots);
|
SemUtil::remove(items);
|
SemUtil::remove(mutex);
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
|
{
|
return m_qImpl.size();
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
|
{
|
return m_qImpl.full();
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
|
{
|
return m_qImpl.empty();
|
}
|
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
|
{
|
// printf("==================LockFreeQueue push before\n");
|
if (SemUtil::dec(slots) == -1) {
|
err_msg(errno, "LockFreeQueue push");
|
return false;
|
}
|
|
if ( m_qImpl.push(a_data) ) {
|
|
SemUtil::inc(items);
|
// printf("==================LockFreeQueue push after\n");
|
return true;
|
}
|
return false;
|
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
|
{
|
if (SemUtil::dec_nowait(slots) == -1) {
|
if (errno == EAGAIN)
|
return false;
|
else {
|
err_msg(errno, "LockFreeQueue push_nowait");
|
return false;
|
}
|
|
}
|
|
if ( m_qImpl.push(a_data)) {
|
SemUtil::inc(items);
|
return true;
|
}
|
return false;
|
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout)
|
{
|
|
|
if (SemUtil::dec_timeout(slots, timeout) == -1) {
|
if (errno == EAGAIN)
|
return false;
|
else {
|
// err_msg(errno, "LockFreeQueue push_timeout");
|
return false;
|
}
|
}
|
|
if (m_qImpl.push(a_data)){
|
SemUtil::inc(items);
|
return true;
|
}
|
return false;
|
|
}
|
|
|
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
|
{
|
// printf("==================LockFreeQueue pop before\n");
|
if (SemUtil::dec(items) == -1) {
|
err_msg(errno, "LockFreeQueue pop");
|
return false;
|
}
|
|
if (m_qImpl.pop(a_data)) {
|
SemUtil::inc(slots);
|
// printf("==================LockFreeQueue pop after\n");
|
return true;
|
}
|
return false;
|
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
|
{
|
if (SemUtil::dec_nowait(items) == -1) {
|
if (errno == EAGAIN)
|
return false;
|
else {
|
err_msg(errno, "LockFreeQueue pop_nowait");
|
return false;
|
}
|
}
|
|
if (m_qImpl.pop(a_data)) {
|
SemUtil::inc(slots);
|
return true;
|
}
|
return false;
|
|
}
|
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
|
{
|
// printf("==================LockFreeQueue pop_timeout before\n");
|
if (SemUtil::dec_timeout(items, timeout) == -1) {
|
if (errno == EAGAIN)
|
return false;
|
else {
|
// err_msg(errno, "LockFreeQueue pop_timeout");
|
return false;
|
}
|
}
|
|
if (m_qImpl.pop(a_data)) {
|
SemUtil::inc(slots);
|
// printf("==================LockFreeQueue pop_timeout after\n");
|
return true;
|
}
|
return false;
|
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
|
return m_qImpl.operator[](i);
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
|
return Allocator::allocate(size);
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
|
return Allocator::deallocate(p);
|
}
|
|
// include implementation files
|
//#include "linked_lock_free_queue.h"
|
#include "array_lock_free_queue.h"
|
|
#endif // _LOCK_FREE_QUEUE_H__
|