wangzhengquan
2021-01-22 09a82c2ece4caadad0baa0d1f3b84f1506363fdd
src/queue/lock_free_queue.h
@@ -1,3 +1,6 @@
/**
 * encapsulate array_lock_free_queue, add semphore. populate in kernal space.
 */
#ifndef __LOCK_FREE_QUEUE_H__
#define __LOCK_FREE_QUEUE_H__
@@ -7,7 +10,7 @@
#include "sem_util.h"
#include "logger_factory.h"
#include "shm_allocator.h"
#include "px_sem_util.h"
#include "psem.h"
#include "bus_error.h"
// default Queue size
@@ -216,15 +219,14 @@
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
    if (sem_wait(&slots) == -1) {
        err_msg(errno, "LockFreeQueue push");
        return errno;
 LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
    if (psem_wait(&slots) == -1) {
        return -1;
    }
    
    if ( m_qImpl.push(a_data) ) {
        sem_post(&items);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
        psem_post(&items);
LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
        return 0;
    }
    return -1;
@@ -237,18 +239,12 @@
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (sem_trywait(&slots) == -1) {
        if (errno == EAGAIN)
            return EAGAIN;
        else {
            err_msg(errno, "LockFreeQueue push_nowait");
            return errno;
        }
    if (psem_trywait(&slots) == -1) {
        return -1;
    }
    if ( m_qImpl.push(a_data)) {
        sem_post(&items);
        psem_post(&items);
        return 0;
    }
    return -1;
@@ -261,29 +257,14 @@
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
{
    int rv;
    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
  // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld",
  //   timeout.tv_sec, timeout.tv_nsec);
    while ( sem_timedwait(&slots, &timeout) == -1) {
    //     LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n",
    // timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno);
        if(errno == ETIMEDOUT)
            return EBUS_TIMEOUT;
        else if(errno == EINTR)
            continue;
        else {
           LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
           return errno;
        }
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");
    if ( psem_timedwait(&slots, ts) == -1) {
        return -1;
    }
    if (m_qImpl.push(a_data)){
        sem_post(&items);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
        psem_post(&items);
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
        return 0;
    }
    return -1;
@@ -300,15 +281,14 @@
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if (sem_wait(&items) == -1) {
        LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop");
        return errno;
  LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if (psem_wait(&items) == -1) {
        return -1;
    }
    if (m_qImpl.pop(a_data)) {
        sem_post(&slots);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
        psem_post(&slots);
 LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
        return 0;
    }
    return -1;
@@ -320,17 +300,12 @@
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (sem_trywait(&items) == -1) {
        if (errno == EAGAIN)
            return errno;
        else {
            LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait");
            return errno;
        }
    if (psem_trywait(&items) == -1) {
        return -1;
    }
    if (m_qImpl.pop(a_data)) {
        sem_post(&slots);
        psem_post(&slots);
        return 0;
    }
    return -1;
@@ -343,25 +318,12 @@
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
// LoggerFactory::getLogger()->debug("=================ts sec = %d, nsec = %ld \n", ts->tv_sec,  ts->tv_nsec );
    // struct timespec timeout_tmp = {1, 0};
    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
// LoggerFactory::getLogger()->debug("================== timeout before sec = %d, nsec = %ld \n", timeout.tv_sec,  timeout.tv_nsec );
    while (sem_timedwait(&items, &timeout) == -1) {
        if (errno == ETIMEDOUT)
            return EBUS_TIMEOUT;
        else if(errno == EINTR)
            continue;
        else {
          LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout %d", errno);
          return errno;
        }
    if (psem_timedwait(&items, ts) == -1) {
       return -1;
    }
    if (m_qImpl.pop(a_data)) {
        sem_post(&slots);
        psem_post(&slots);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");     
        return 0;
    }