Fu Juntang
2021-09-17 5c912c70e9333298ff48f7ea15424f72ca977b99
src/queue/array_lock_free_queue.h
@@ -5,7 +5,7 @@
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
#include "logger_factory.h"
#include "mem_pool.h"
#include "shm_mm.h"
#include "shm_allocator.h"
/// @brief implementation of an array based lock free queue with support for 
@@ -102,6 +102,7 @@
template<typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() {
  // std::cout << "destroy ArrayLockFreeQueue\n";
  Allocator::deallocate(m_theQueue);
}
@@ -234,7 +235,9 @@
  }
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  AtomicAdd(&m_count, 1);
  if (m_count < Q_SIZE) {
    AtomicAdd(&m_count, 1);
  }
#endif
  return true;
}
@@ -274,7 +277,9 @@
    if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
      // m_count.fetch_sub(1);
      AtomicSub(&m_count, 1);
      if (m_count > 0) {
        AtomicSub(&m_count, 1);
      }
#endif
      return true;
    }
@@ -294,6 +299,7 @@
template<typename ELEM_T, typename Allocator>
ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  int currentCount = m_count;
  uint32_t currentReadIndex = m_readIndex;
  if (i >= currentCount) {
@@ -301,6 +307,9 @@
              << " is out of range\n";
    std::exit(EXIT_FAILURE);
  }
#else
  uint32_t currentReadIndex = m_readIndex;
#endif
  return m_theQueue[countToIndex(currentReadIndex + i)];
}