tmp
wangzhengquan
2021-01-22 4c73fd7179e92bee9cccb65e46823b00f568acb3
src/queue/lock_free_queue.h
@@ -12,7 +12,7 @@
#include "shm_allocator.h"
#include "psem.h"
#include "bus_error.h"
#include "bus_def.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
@@ -76,23 +76,23 @@
    typename Allocator = SHM_Allocator,
    template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
     >
class LockFreeQueue
{
class LockFreeQueue {
private:
    sem_t slots;  
    sem_t items;
public:
    sem_t mutex;
    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    
    /// @brief destructor of the class. 
    /// Note it is not virtual since it is not expected to inherit from this
    /// template
    ~LockFreeQueue();
    std::atomic_uint reference;    
    /// @brief constructor of the class
   
@@ -123,20 +123,17 @@
    /// structures to be inserted in the queue you should think of instantiate the template
    /// of the queue as a pointer to that large structure
    /// @return true if the element was inserted in the queue. False if the queue was full
    int push(const ELEM_T &a_data);
    int push_nowait(const ELEM_T &a_data);
    int push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
  int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
    /// @brief pop the element at the head of the queue
    /// @param a reference where the element in the head of the queue will be saved to
    /// Note that the a_data parameter might contain rubbish if the function returns false
    /// @return true if the element was successfully extracted from the queue. False if the queue was empty
    int pop(ELEM_T &a_data);
    int pop_nowait(ELEM_T &a_data);
    int pop_timeout(ELEM_T &a_data, struct timespec * timeout);
  int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
    void *operator new(size_t size);
    void operator delete(void *p);
protected:
@@ -154,8 +151,7 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
{
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) {
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
    if (sem_init(&slots, 1, qsize) == -1)
        err_exit(errno, "LockFreeQueue sem_init");
@@ -167,12 +163,12 @@
   
}
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
{
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
    // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
    if(sem_destroy(&slots) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
@@ -189,8 +185,7 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
{
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() {
    return m_qImpl.size();
}  
@@ -198,8 +193,7 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
{
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() {
    return m_qImpl.full();
}
@@ -207,22 +201,30 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
{
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() {
    return m_qImpl.empty();
}  
template <
    typename ELEM_T,
template<typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
 LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");   
    if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        if (psem_trywait(&slots) == -1) {
            return -1;
        }
    } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        if (psem_timedwait(&slots, timeout) == -1) {
            return -1;
        }
    } else {
    if (psem_wait(&slots) == -1) {
        return -1;
    }
    }
    
    if ( m_qImpl.push(a_data) ) {
        psem_post(&items);   
@@ -233,58 +235,28 @@
    
}
template <
    typename ELEM_T,
template<typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (psem_trywait(&slots) == -1) {
        return -1;
    }
    if ( m_qImpl.push(a_data)) {
        psem_post(&items);
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
{
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");
    if ( psem_timedwait(&slots, ts) == -1) {
        return -1;
    }
    if (m_qImpl.push(a_data)){
        psem_post(&items);
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
  LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        if (psem_trywait(&items) == -1) {
            return -1;
        }
    } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        if (psem_timedwait(&items, timeout) == -1) {
            return -1;
        }
    } else {
    if (psem_wait(&items) == -1) {
        return -1;
    }
    }
    if (m_qImpl.pop(a_data)) {
        psem_post(&slots);
@@ -294,45 +266,7 @@
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (psem_trywait(&items) == -1) {
        return -1;
    }
    if (m_qImpl.pop(a_data)) {
        psem_post(&slots);
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
    if (psem_timedwait(&items, ts) == -1) {
       return -1;
    }
    if (m_qImpl.pop(a_data)) {
        psem_post(&slots);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
template<typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
@@ -340,16 +274,14 @@
}
template <
    typename ELEM_T,
template<typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
        return Allocator::allocate(size);
}
template <
    typename ELEM_T,
template<typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {