From acbf282b23b4cbdebca562d67132573de3902f94 Mon Sep 17 00:00:00 2001
From: Fu Juntang <StrongTiger_001@163.com>
Date: 星期五, 17 九月 2021 10:45:43 +0800
Subject: [PATCH] Merge branch 'master' of http://os.smartai.com:9091/valib/c_bhomebus
---
src/queue/lock_free_queue.h | 566 +++++++++++++++++++++++++-------------------------------
1 files changed, 256 insertions(+), 310 deletions(-)
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 70b8ef1..6363ccb 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -1,28 +1,37 @@
+/**
+ * encapsulate array_lock_free_queue, add semphore. populate in kernal space.
+ */
#ifndef __LOCK_FREE_QUEUE_H__
#define __LOCK_FREE_QUEUE_H__
#include <usg_common.h>
#include <assert.h> // assert()
-#include "mem_pool.h"
+#include "shm_mm.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "shm_allocator.h"
-#include "px_sem_util.h"
+#include "psem.h"
#include "bus_error.h"
+#include "bus_def.h"
// default Queue size
-#define LOCK_FREE_Q_DEFAULT_SIZE 16
+#define LOCK_FREE_Q_DEFAULT_SIZE 320
+
+
+#define LOCK_FREE_Q_ST_OPENED 0
+
+#define LOCK_FREE_Q_ST_CLOSED 1
// static Logger *logger = LoggerFactory::getLogger();
-// define this macro if calls to "size" must return the real size of the
-// queue. If it is undefined that function will try to take a snapshot of
+// define this macro if calls to "size" must return the real size of the
+// queue. If it is undefined that function will try to take a snapshot of
// the queue, but returned value might be bogus
// forward declarations for default template values
//
-template <typename ELEM_T, typename Allocator>
+template<typename ELEM_T, typename Allocator>
class ArrayLockFreeQueue;
// template <typename ELEM_T>
@@ -30,9 +39,9 @@
/// @brief Lock-free queue based on a circular array
-/// No allocation of extra memory for the nodes handling is needed, but it has
-/// to add extra overhead (extra CAS operation) when inserting to ensure the
-/// thread-safety of the queue when the queue type is not
+/// No allocation of extra memory for the nodes handling is needed, but it has
+/// to add extra overhead (extra CAS operation) when inserting to ensure the
+/// thread-safety of the queue when the queue type is not
/// ArrayLockFreeQueueSingleProducer.
///
/// examples of instantiation:
@@ -47,351 +56,288 @@
///
/// ELEM_T represents the type of elementes pushed and popped from the queue
/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
-/// This number should be a power of 2 to ensure
-/// indexes in the circular queue keep stable when the uint32_t
+/// This number should be a power of 2 to ensure
+/// indexes in the circular queue keep stable when the uint32_t
/// variable that holds the current position rolls over from FFFFFFFF
/// to 0. For instance
-/// 2 -> 0x02
+/// 2 -> 0x02
/// 4 -> 0x04
/// 8 -> 0x08
/// 16 -> 0x10
-/// (...)
+/// (...)
/// 1024 -> 0x400
/// 2048 -> 0x800
///
/// if queue size is not defined as requested, let's say, for
/// instance 100, when current position is FFFFFFFF (4,294,967,295)
-/// index in the circular array is 4,294,967,295 % 100 = 95.
-/// When that value is incremented it will be set to 0, that is the
+/// index in the circular array is 4,294,967,295 % 100 = 95.
+/// When that value is incremented it will be set to 0, that is the
/// last 4 elements of the queue are not used when the counter rolls
/// over to 0
-/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
+/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
/// ArrayLockFreeQueue are supported (single producer
/// by default)
-template <
- typename ELEM_T,
- typename Allocator = SHM_Allocator,
- template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
- >
-class LockFreeQueue
-{
+template<
+ typename ELEM_T,
+ typename Allocator = SHM_Allocator,
+ template<typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
+>
+class LockFreeQueue {
private:
- sem_t slots;
- sem_t items;
+ sem_t slots;
+ sem_t items;
+ time_t createTime;
+ time_t closeTime;
+ int status;
-
public:
- sem_t mutex;
- LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
-
- /// @brief destructor of the class.
- /// Note it is not virtual since it is not expected to inherit from this
- /// template
- ~LockFreeQueue();
- std::atomic_uint reference;
- /// @brief constructor of the class
-
- /// @brief returns the current number of items in the queue
- /// It tries to take a snapshot of the size of the queue, but in busy environments
- /// this function might return bogus values.
- ///
- /// If a reliable queue size must be kept you might want to have a look at
- /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
- /// it enables a reliable size though it hits overall performance of the queue
- /// (when the reliable size variable is on it's got an impact of about 20% in time)
- inline uint32_t size();
-
- /// @brief return true if the queue is full. False otherwise
- /// It tries to take a snapshot of the size of the queue, but in busy
- /// environments this function might return bogus values. See help in method
- /// LockFreeQueue::size
- inline bool full();
+ LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
- inline bool empty();
+ /// @brief destructor of the class.
+ /// Note it is not virtual since it is not expected to inherit from this
+ /// template
+ ~LockFreeQueue();
- inline ELEM_T& operator[](unsigned i);
+ inline void close();
+ inline bool isClosed();
- /// @brief push an element at the tail of the queue
- /// @param the element to insert in the queue
- /// Note that the element is not a pointer or a reference, so if you are using large data
- /// structures to be inserted in the queue you should think of instantiate the template
- /// of the queue as a pointer to that large structure
- /// @return true if the element was inserted in the queue. False if the queue was full
- int push(const ELEM_T &a_data);
- int push_nowait(const ELEM_T &a_data);
- int push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
-
- /// @brief pop the element at the head of the queue
- /// @param a reference where the element in the head of the queue will be saved to
- /// Note that the a_data parameter might contain rubbish if the function returns false
- /// @return true if the element was successfully extracted from the queue. False if the queue was empty
- int pop(ELEM_T &a_data);
- int pop_nowait(ELEM_T &a_data);
- int pop_timeout(ELEM_T &a_data, struct timespec * timeout);
+ // std::atomic_uint reference;
+ /// @brief constructor of the class
- void *operator new(size_t size);
- void operator delete(void *p);
+ /// @brief returns the current number of items in the queue
+ /// It tries to take a snapshot of the size of the queue, but in busy environments
+ /// this function might return bogus values.
+ ///
+ /// If a reliable queue size must be kept you might want to have a look at
+ /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
+ /// it enables a reliable size though it hits overall performance of the queue
+ /// (when the reliable size variable is on it's got an impact of about 20% in time)
+ inline uint32_t size();
+
+ /// @brief return true if the queue is full. False otherwise
+ /// It tries to take a snapshot of the size of the queue, but in busy
+ /// environments this function might return bogus values. See help in method
+ /// LockFreeQueue::size
+ inline bool full();
+
+ inline bool empty();
+
+ inline ELEM_T &operator[](unsigned i);
+
+
+
+ time_t getCreateTime() {
+ return createTime;
+ }
+
+ time_t getCloseTime() {
+ return closeTime;
+ }
+
+ int getStatus() {
+ return status;
+ }
+
+ /// @brief push an element at the tail of the queue
+ /// @param the element to insert in the queue
+ /// Note that the element is not a pointer or a reference, so if you are using large data
+ /// structures to be inserted in the queue you should think of instantiate the template
+ /// of the queue as a pointer to that large structure
+ /// @return true if the element was inserted in the queue. False if the queue was full
+ int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
+
+ /// @brief pop the element at the head of the queue
+ /// @param a reference where the element in the head of the queue will be saved to
+ /// Note that the a_data parameter might contain rubbish if the function returns false
+ /// @return true if the element was successfully extracted from the queue. False if the queue was empty
+ int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
+
+
+ void *operator new(size_t size);
+
+ void operator delete(void *p);
protected:
- /// @brief the actual queue. methods are forwarded into the real
- /// implementation
- Q_TYPE<ELEM_T, Allocator> m_qImpl;
+ /// @brief the actual queue. methods are forwarded into the real
+ /// implementation
+ Q_TYPE<ELEM_T, Allocator> m_qImpl;
private:
- /// @brief disable copy constructor declaring it private
- LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
+ /// @brief disable copy constructor declaring it private
+ LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
};
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
-{
-// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
- if (sem_init(&slots, 1, qsize) == -1)
- err_exit(errno, "LockFreeQueue sem_init");
- if (sem_init(&items, 1, 0) == -1)
- err_exit(errno, "LockFreeQueue sem_init");
- if (sem_init(&mutex, 1, 1) == -1)
- err_exit(errno, "LockFreeQueue sem_init");
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) {
+ //std::cout << "LockFreeQueue init reference=" << reference << std::endl;
+ if (sem_init(&slots, 1, qsize) == -1)
+ err_exit(errno, "LockFreeQueue sem_init");
+ if (sem_init(&items, 1, 0) == -1)
+ err_exit(errno, "LockFreeQueue sem_init");
+
+ createTime = time(NULL);
+ status = LOCK_FREE_Q_ST_OPENED;
-
-}
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
-{
- // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
- if(sem_destroy(&slots) == -1) {
- err_exit(errno, "LockFreeQueue sem_destroy");
- }
- if(sem_destroy(&items) == -1) {
- err_exit(errno, "LockFreeQueue sem_destroy");
- }
- if(sem_destroy(&mutex) == -1) {
- err_exit(errno, "LockFreeQueue sem_destroy");
- }
-}
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
-{
- return m_qImpl.size();
-}
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
-{
- return m_qImpl.full();
-}
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
-{
- return m_qImpl.empty();
-}
-
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
-{
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
- if (sem_wait(&slots) == -1) {
- err_msg(errno, "LockFreeQueue push");
- return errno;
- }
-
- if ( m_qImpl.push(a_data) ) {
- sem_post(&items);
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
- return 0;
- }
- return -1;
-
-}
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
-{
- if (sem_trywait(&slots) == -1) {
- if (errno == EAGAIN)
- return EAGAIN;
- else {
- err_msg(errno, "LockFreeQueue push_nowait");
- return errno;
- }
-
- }
-
- if ( m_qImpl.push(a_data)) {
- sem_post(&items);
- return 0;
- }
- return -1;
-
-}
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
-{
-
- int rv;
- struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
- // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld",
- // timeout.tv_sec, timeout.tv_nsec);
-
- while ( sem_timedwait(&slots, &timeout) == -1) {
- // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n",
- // timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno);
-
- if(errno == ETIMEDOUT)
- return EBUS_TIMEOUT;
- else if(errno == EINTR)
- continue;
- else {
- LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
- return errno;
- }
- }
-
- if (m_qImpl.push(a_data)){
- sem_post(&items);
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
- return 0;
- }
- return -1;
-
}
-
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
-{
-
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
- if (sem_wait(&items) == -1) {
- LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop");
- return errno;
- }
-
- if (m_qImpl.pop(a_data)) {
- sem_post(&slots);
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
- return 0;
- }
- return -1;
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() {
+ status = LOCK_FREE_Q_ST_CLOSED;
+ closeTime = time(NULL);
}
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
-{
- if (sem_trywait(&items) == -1) {
- if (errno == EAGAIN)
- return errno;
- else {
- LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait");
- return errno;
- }
- }
-
- if (m_qImpl.pop(a_data)) {
- sem_post(&slots);
- return 0;
- }
- return -1;
-}
-
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
-{
-// LoggerFactory::getLogger()->debug("=================ts sec = %d, nsec = %ld \n", ts->tv_sec, ts->tv_nsec );
-
- // struct timespec timeout_tmp = {1, 0};
- struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
-// LoggerFactory::getLogger()->debug("================== timeout before sec = %d, nsec = %ld \n", timeout.tv_sec, timeout.tv_nsec );
-
- while (sem_timedwait(&items, &timeout) == -1) {
- if (errno == ETIMEDOUT)
- return EBUS_TIMEOUT;
- else if(errno == EINTR)
- continue;
- else {
- LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout %d", errno);
- return errno;
- }
- }
-
- if (m_qImpl.pop(a_data)) {
- sem_post(&slots);
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");
- return 0;
- }
- return -1;
-
-}
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
- return m_qImpl.operator[](i);
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::isClosed() {
+ return status == LOCK_FREE_Q_ST_CLOSED;
}
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
- return Allocator::allocate(size);
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
+ if (sem_destroy(&slots) == -1) {
+ err_exit(errno, "LockFreeQueue sem_destroy");
+ }
+ if (sem_destroy(&items) == -1) {
+ err_exit(errno, "LockFreeQueue sem_destroy");
+ }
+
}
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() {
+ return m_qImpl.size();
+}
+
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() {
+ return m_qImpl.full();
+}
+
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() {
+ return m_qImpl.empty();
+}
+
+
+template<typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
+ sigset_t mask_all, pre;
+ sigfillset(&mask_all);
+
+ sigprocmask(SIG_BLOCK, &mask_all, &pre);
+
+ if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+ if (psem_trywait(&slots) == -1) {
+ goto LABEL_FAILTURE;
+ }
+ } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+ if (psem_timedwait(&slots, timeout) == -1) {
+ goto LABEL_FAILTURE;
+ }
+ } else {
+ if (psem_wait(&slots) == -1) {
+ goto LABEL_FAILTURE;
+ }
+ }
+
+
+ if (m_qImpl.push(a_data)) {
+ psem_post(&items);
+ sigprocmask(SIG_SETMASK, &pre, NULL);
+ return 0;
+ }
+
+ LABEL_FAILTURE:
+ sigprocmask(SIG_SETMASK, &pre, NULL);
+ return errno;
+}
+
+template<typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
+
+ sigset_t mask_all, pre;
+ sigfillset(&mask_all);
+
+ sigprocmask(SIG_BLOCK, &mask_all, &pre);
+
+ if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+ if (psem_trywait(&items) == -1) {
+ goto LABEL_FAILTURE;
+ }
+ } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+ if (psem_timedwait(&items, timeout) == -1) {
+ goto LABEL_FAILTURE;
+ }
+ } else {
+ if (psem_wait(&items) == -1) {
+ goto LABEL_FAILTURE;
+ }
+ }
+
+ if (m_qImpl.pop(a_data)) {
+ psem_post(&slots);
+ sigprocmask(SIG_SETMASK, &pre, NULL);
+ return 0;
+ }
+
+
+ LABEL_FAILTURE:
+ sigprocmask(SIG_SETMASK, &pre, NULL);
+ return errno;
+}
+
+template<typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+ELEM_T &LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
+ return m_qImpl.operator[](i);
+}
+
+
+template<typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+void *LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size) {
+ return Allocator::allocate(size);
+}
+
+template<typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
- return Allocator::deallocate(p);
+ LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * _que = (LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * )p;
+ Allocator::deallocate(p);
}
// include implementation files
--
Gitblit v1.8.0