wangzhengquan
2021-03-13 a38304f7f6b91aaa1b0aa76cc9d3e5b6aef1f85f
src/queue/array_lock_free_sem_queue.h
@@ -4,7 +4,7 @@
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
#include "logger_factory.h"
#include "mem_pool.h"
#include "shm_mm.h"
#include "shm_allocator.h"
#include "futex_sem.h"
#include "time_util.h"
@@ -204,6 +204,7 @@
{
  uint32_t currentReadIndex;
  uint32_t currentWriteIndex;
  uint32_t tmpIndex;
  int s;
  // sigset_t mask_all, pre;
@@ -215,41 +216,42 @@
  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    if (m_count == Q_SIZE) {
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
        return -1;
        return errno;
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
          return errno;
        }
            
      } else {
        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
          return errno;
        }
      }
    }
  #else
    if (currentReadIndex == currentWriteIndex - Q_SIZE  + 1   )
    tmpIndex = (uint32_t)(currentWriteIndex - Q_SIZE  + 1);
    if (currentReadIndex ==   tmpIndex )
    {
        // the queue is full
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
        return -1;
        return errno;
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0);
        s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, timeout, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
          return errno;
        }
            
      } else {
        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, NULL, NULL, 0);
        s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
          return errno;
        }
      }
    }
@@ -315,22 +317,21 @@
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        // sigprocmask(SIG_SETMASK, &pre, NULL);
        return -1;
        return errno;
      }
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
        s = futex((int *)&m_count, FUTEX_WAIT, 0, timeout, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
          return errno;
        }
            
      } else {
        s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
          return errno;
        }
      }
    }
@@ -344,7 +345,7 @@
      // waiting to commit the data into it
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        // sigprocmask(SIG_SETMASK, &pre, NULL);
        return -1;
        return errno;
      }
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
@@ -352,14 +353,14 @@
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
          return errno;
        }
            
      } else {
        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
         // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
          return errno;
        }
      }
    }