wangzhengquan
2021-01-22 09a82c2ece4caadad0baa0d1f3b84f1506363fdd
src/queue/array_lock_free_sem_queue.h
@@ -75,7 +75,7 @@
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  /// @brief number of elements in the queue
  int m_count;
  uint32_t m_count;
#endif
@@ -200,10 +200,6 @@
}
  template <typename ELEM_T, typename Allocator>
int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data,  const struct timespec *timeout, int flag)
{
@@ -215,28 +211,50 @@
  {
    currentWriteIndex = m_writeIndex;
    currentReadIndex  = m_readIndex;
  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    if (m_count == Q_SIZE) {
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
        }
            
      } else {
        s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
        }
      }
    }
  #else
    if (currentReadIndex == currentWriteIndex - Q_SIZE  + 1   )
    {
        // the queue is full
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
        }
      } else {
        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
        }
      }
    }
  #endif
    //保留写入位
  } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
  // We know now that this index is reserved for us. Use it to save the data
@@ -255,10 +273,16 @@
    sched_yield();
  }
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  AtomicAdd(&m_count, 1);
  s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
  if (s  == -1)
  if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1)
      err_exit(errno, "futex-FUTEX_WAKE");
#else
  if ( (s = futex((int *)&m_maximumReadIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1)
      err_exit(errno, "futex-FUTEX_WAKE");
#endif
  return 0;
}
@@ -268,14 +292,15 @@
{
  uint32_t currentMaximumReadIndex;
  uint32_t currentReadIndex;
  int s;
  do
  {
    // to ensure thread-safety when there is more than 1 producer thread
    // a second index is defined (m_maximumReadIndex)
    currentReadIndex        = m_readIndex;
    currentMaximumReadIndex = m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    if (m_count == 0) {
@@ -284,26 +309,43 @@
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
        s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
        }
            
      } else {
        s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
        s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
        }
      }
    }
#else
    if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
    if (currentReadIndex == currentMaximumReadIndex)
    {
      // the queue is empty or
      // a producer thread has allocate space in the queue but is
      // waiting to commit the data into it
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
      return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
        }
      } else {
        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
        }
      }
    }
#endif
@@ -318,11 +360,13 @@
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
      // m_count.fetch_sub(1);
      AtomicSub(&m_count, 1);
      if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1)
        err_exit(errno, "futex-FUTEX_WAKE");
    #else
      if ( (s = futex((int *)&m_readIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1)
        err_exit(errno, "futex-FUTEX_WAKE");
#endif
      s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
      if (s  == -1)
        err_exit(errno, "futex-FUTEX_WAKE");
      return 0;
    }
@@ -342,13 +386,13 @@
  template <typename ELEM_T, typename Allocator>
ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
  int currentCount = m_count;
  // int currentCount = m_count;
  uint32_t currentReadIndex = m_readIndex;
  if (i >= currentCount)
  {
    std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
    std::exit(EXIT_FAILURE);
  }
  // if (i >= currentCount)
  // {
  //   std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
  //   std::exit(EXIT_FAILURE);
  // }
  return m_theQueue[countToIndex(currentReadIndex+i)];
}