#ifndef __LOCK_FREE_QUEUE_H__
|
#define __LOCK_FREE_QUEUE_H__
|
|
#include <usg_common.h>
|
#include <assert.h> // assert()
|
#include "mem_pool.h"
|
#include "sem_util.h"
|
#include "logger_factory.h"
|
#include "shm_allocator.h"
|
#include "px_sem_util.h"
|
#include "bus_error.h"
|
|
// default Queue size
|
#define LOCK_FREE_Q_DEFAULT_SIZE 16
|
|
// static Logger *logger = LoggerFactory::getLogger();
|
// define this macro if calls to "size" must return the real size of the
|
// queue. If it is undefined that function will try to take a snapshot of
|
// the queue, but returned value might be bogus
|
|
|
// forward declarations for default template values
|
//
|
|
template <typename ELEM_T, typename Allocator>
|
class ArrayLockFreeQueue;
|
|
// template <typename ELEM_T>
|
// class LinkedLockFreeQueue;
|
|
|
/// @brief Lock-free queue based on a circular array
|
/// No allocation of extra memory for the nodes handling is needed, but it has
|
/// to add extra overhead (extra CAS operation) when inserting to ensure the
|
/// thread-safety of the queue when the queue type is not
|
/// ArrayLockFreeQueueSingleProducer.
|
///
|
/// examples of instantiation:
|
/// ArrayLockFreeQueue<int> q; // queue of ints of default size (65535 - 1)
|
/// // and defaulted to single producer
|
/// ArrayLockFreeQueue<int, 16> q;
|
/// // queue of ints of size (16 - 1) and
|
/// // defaulted to single producer
|
/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
|
/// // queue of ints of size (100 - 1) with support
|
/// // for multiple producers
|
///
|
/// ELEM_T represents the type of elementes pushed and popped from the queue
|
/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
|
/// This number should be a power of 2 to ensure
|
/// indexes in the circular queue keep stable when the uint32_t
|
/// variable that holds the current position rolls over from FFFFFFFF
|
/// to 0. For instance
|
/// 2 -> 0x02
|
/// 4 -> 0x04
|
/// 8 -> 0x08
|
/// 16 -> 0x10
|
/// (...)
|
/// 1024 -> 0x400
|
/// 2048 -> 0x800
|
///
|
/// if queue size is not defined as requested, let's say, for
|
/// instance 100, when current position is FFFFFFFF (4,294,967,295)
|
/// index in the circular array is 4,294,967,295 % 100 = 95.
|
/// When that value is incremented it will be set to 0, that is the
|
/// last 4 elements of the queue are not used when the counter rolls
|
/// over to 0
|
/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
|
/// ArrayLockFreeQueue are supported (single producer
|
/// by default)
|
template <
|
typename ELEM_T,
|
typename Allocator = SHM_Allocator,
|
template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
|
>
|
class LockFreeQueue
|
{
|
|
private:
|
sem_t slots;
|
sem_t items;
|
|
|
|
public:
|
sem_t mutex;
|
LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
|
|
/// @brief destructor of the class.
|
/// Note it is not virtual since it is not expected to inherit from this
|
/// template
|
~LockFreeQueue();
|
std::atomic_uint reference;
|
/// @brief constructor of the class
|
|
|
/// @brief returns the current number of items in the queue
|
/// It tries to take a snapshot of the size of the queue, but in busy environments
|
/// this function might return bogus values.
|
///
|
/// If a reliable queue size must be kept you might want to have a look at
|
/// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
|
/// it enables a reliable size though it hits overall performance of the queue
|
/// (when the reliable size variable is on it's got an impact of about 20% in time)
|
inline uint32_t size();
|
|
/// @brief return true if the queue is full. False otherwise
|
/// It tries to take a snapshot of the size of the queue, but in busy
|
/// environments this function might return bogus values. See help in method
|
/// LockFreeQueue::size
|
inline bool full();
|
|
inline bool empty();
|
|
inline ELEM_T& operator[](unsigned i);
|
|
/// @brief push an element at the tail of the queue
|
/// @param the element to insert in the queue
|
/// Note that the element is not a pointer or a reference, so if you are using large data
|
/// structures to be inserted in the queue you should think of instantiate the template
|
/// of the queue as a pointer to that large structure
|
/// @return true if the element was inserted in the queue. False if the queue was full
|
int push(const ELEM_T &a_data);
|
int push_nowait(const ELEM_T &a_data);
|
int push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
|
|
/// @brief pop the element at the head of the queue
|
/// @param a reference where the element in the head of the queue will be saved to
|
/// Note that the a_data parameter might contain rubbish if the function returns false
|
/// @return true if the element was successfully extracted from the queue. False if the queue was empty
|
int pop(ELEM_T &a_data);
|
int pop_nowait(ELEM_T &a_data);
|
int pop_timeout(ELEM_T &a_data, struct timespec * timeout);
|
|
|
void *operator new(size_t size);
|
void operator delete(void *p);
|
|
protected:
|
/// @brief the actual queue. methods are forwarded into the real
|
/// implementation
|
Q_TYPE<ELEM_T, Allocator> m_qImpl;
|
|
private:
|
/// @brief disable copy constructor declaring it private
|
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
|
};
|
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
|
{
|
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
|
if (sem_init(&slots, 1, qsize) == -1)
|
err_exit(errno, "LockFreeQueue sem_init");
|
if (sem_init(&items, 1, 0) == -1)
|
err_exit(errno, "LockFreeQueue sem_init");
|
if (sem_init(&mutex, 1, 1) == -1)
|
err_exit(errno, "LockFreeQueue sem_init");
|
|
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
|
{
|
// LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
|
if(sem_destroy(&slots) == -1) {
|
err_exit(errno, "LockFreeQueue sem_destroy");
|
}
|
if(sem_destroy(&items) == -1) {
|
err_exit(errno, "LockFreeQueue sem_destroy");
|
}
|
if(sem_destroy(&mutex) == -1) {
|
err_exit(errno, "LockFreeQueue sem_destroy");
|
}
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
|
{
|
return m_qImpl.size();
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
|
{
|
return m_qImpl.full();
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
|
{
|
return m_qImpl.empty();
|
}
|
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
|
{
|
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
|
if (sem_wait(&slots) == -1) {
|
err_msg(errno, "LockFreeQueue push");
|
return errno;
|
}
|
|
if ( m_qImpl.push(a_data) ) {
|
sem_post(&items);
|
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
|
return 0;
|
}
|
return -1;
|
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
|
{
|
if (sem_trywait(&slots) == -1) {
|
if (errno == EAGAIN)
|
return EAGAIN;
|
else {
|
err_msg(errno, "LockFreeQueue push_nowait");
|
return errno;
|
}
|
|
}
|
|
if ( m_qImpl.push(a_data)) {
|
sem_post(&items);
|
return 0;
|
}
|
return -1;
|
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
|
{
|
|
int rv;
|
struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
|
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld",
|
// timeout.tv_sec, timeout.tv_nsec);
|
|
while ( sem_timedwait(&slots, &timeout) == -1) {
|
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n",
|
// timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno);
|
|
if(errno == ETIMEDOUT)
|
return EBUS_TIMEOUT;
|
else if(errno == EINTR)
|
continue;
|
else {
|
LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
|
return errno;
|
}
|
}
|
|
if (m_qImpl.push(a_data)){
|
sem_post(&items);
|
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
|
return 0;
|
}
|
return -1;
|
|
}
|
|
|
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
|
{
|
|
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
|
if (sem_wait(&items) == -1) {
|
LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop");
|
return errno;
|
}
|
|
if (m_qImpl.pop(a_data)) {
|
sem_post(&slots);
|
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
|
return 0;
|
}
|
return -1;
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
|
{
|
if (sem_trywait(&items) == -1) {
|
if (errno == EAGAIN)
|
return errno;
|
else {
|
LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait");
|
return errno;
|
}
|
}
|
|
if (m_qImpl.pop(a_data)) {
|
sem_post(&slots);
|
return 0;
|
}
|
return -1;
|
}
|
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
|
{
|
// LoggerFactory::getLogger()->debug("=================ts sec = %d, nsec = %ld \n", ts->tv_sec, ts->tv_nsec );
|
|
LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");
|
struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
|
// LoggerFactory::getLogger()->debug("================== timeout before sec = %d, nsec = %ld \n", timeout.tv_sec, timeout.tv_nsec );
|
|
while (sem_timedwait(&items, &timeout) == -1) {
|
if (errno == ETIMEDOUT)
|
return EBUS_TIMEOUT;
|
else if(errno == EINTR)
|
continue;
|
else {
|
// LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout %d", errno);
|
return errno;
|
}
|
}
|
|
if (m_qImpl.pop(a_data)) {
|
sem_post(&slots);
|
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");
|
return 0;
|
}
|
return -1;
|
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
|
return m_qImpl.operator[](i);
|
}
|
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
|
return Allocator::allocate(size);
|
}
|
|
template <
|
typename ELEM_T,
|
typename Allocator,
|
template <typename T, typename AT> class Q_TYPE>
|
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
|
return Allocator::deallocate(p);
|
}
|
|
// include implementation files
|
//#include "linked_lock_free_queue.h"
|
#include "array_lock_free_queue.h"
|
|
#endif // _LOCK_FREE_QUEUE_H__
|