From 2c65db46500207f8445aa4baa53bfbb6602e0e18 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 21 一月 2021 16:37:03 +0800
Subject: [PATCH] restructure
---
test/futex_demo.cpp | 3
src/time_util.h | 14
src/queue/lock_free_queue.h | 39 -
src/socket/shm_socket.h | 1
src/futex_sem.cpp | 8
CMakeLists.txt | 1
src/socket/shm_socket.cpp | 8
src/futex_sem.h | 15
/dev/null | 322 ------------------
src/queue/array_lock_free_queue.h | 4
src/queue/shm_queue.h | 123 ++++--
src/queue/array_lock_free_sem_queue.h | 367 +++++++++++++++++++++
src/psem.cpp | 19 -
src/CMakeLists.txt | 44 +-
src/time_util.cpp | 26 +
15 files changed, 545 insertions(+), 449 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ef9b64a..8310bfa 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -16,6 +16,7 @@
option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
option(BUILD_DOC "Build doc" OFF)
+
list(APPEND EXTRA_INCLUDES "${PROJECT_SOURCE_DIR}/include/usgcommon")
list(APPEND EXTRA_LIBS ${PROJECT_SOURCE_DIR}/lib/libusgcommon.a pthread rt)
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index cfd7f89..8343eb2 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -8,24 +8,27 @@
configure_file(bus_config.h.in bus_config.h)
add_library(shm_queue
- ./logger_factory.cpp
- ./socket/bus_server_socket.cpp
- ./socket/bus_server_socket_wrapper.cpp
- ./socket/shm_stream_mod_socket.cpp
- ./socket/shm_socket.cpp
- ./socket/shm_mod_socket.cpp
- ./psem.cpp
- ./svsem_util.cpp
- ./bus_error.cpp
- ./net/net_conn_pool.cpp
- ./net/net_mod_server_socket_wrapper.cpp
- ./net/net_mod_socket_wrapper.cpp
- ./net/net_mod_socket.cpp
- ./net/net_mod_socket_io.cpp
- ./net/net_mod_server_socket.cpp
- ./shm/shm_mm_wrapper.cpp
- ./shm/mm.cpp
- ./shm/hashtable.cpp
+ ./logger_factory.cpp
+./socket/bus_server_socket.cpp
+./socket/bus_server_socket_wrapper.cpp
+./socket/shm_stream_mod_socket.cpp
+./socket/shm_socket.cpp
+./socket/shm_mod_socket.cpp
+./time_util.cpp
+./psem.cpp
+./svsem_util.cpp
+./bus_error.cpp
+./futex_sem.cpp
+./net/net_conn_pool.cpp
+./net/net_mod_server_socket_wrapper.cpp
+./net/net_mod_socket_wrapper.cpp
+./net/net_mod_socket.cpp
+./net/net_mod_socket_io.cpp
+./net/net_mod_server_socket.cpp
+./shm/shm_mm_wrapper.cpp
+./shm/mm.cpp
+./shm/hashtable.cpp
+
)
@@ -41,7 +44,8 @@
${CMAKE_CURRENT_SOURCE_DIR}/socket
${CMAKE_CURRENT_SOURCE_DIR}/net
)
-
+
+
target_link_libraries(shm_queue PUBLIC ${EXTRA_LIBS} )
# install rules
@@ -55,6 +59,8 @@
./socket/bus_server_socket_wrapper.h
./psem.h
./key_def.h
+./time_util.h
+./futex_sem.h
./bus_error.h
./svsem_util.h
./logger_factory.h
diff --git a/src/futex_sem.cpp b/src/futex_sem.cpp
new file mode 100644
index 0000000..226145f
--- /dev/null
+++ b/src/futex_sem.cpp
@@ -0,0 +1,8 @@
+#include "futex_sem.h"
+
+
+int futex(int *uaddr, int futex_op, int val,
+ const struct timespec *timeout, int *uaddr2, int val3)
+{
+ return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr, val3);
+}
\ No newline at end of file
diff --git a/src/futex_sem.h b/src/futex_sem.h
new file mode 100644
index 0000000..787fca6
--- /dev/null
+++ b/src/futex_sem.h
@@ -0,0 +1,15 @@
+#ifndef _FUTEXT_SEM_H_
+#define _FUTEXT_SEM_H_
+#include "usg_common.h"
+#include <sys/wait.h>
+#include <sys/mman.h>
+#include <sys/syscall.h>
+#include <linux/futex.h>
+#include <sys/time.h>
+#include <sys/mman.h>
+#include <sys/stat.h> /* For mode constants */
+#include <fcntl.h>
+int futex(int *uaddr, int futex_op, int val,
+ const struct timespec *timeout, int *uaddr2, int val3);
+
+#endif
\ No newline at end of file
diff --git a/src/psem.cpp b/src/psem.cpp
index 8d9333f..d0eb2c2 100644
--- a/src/psem.cpp
+++ b/src/psem.cpp
@@ -1,26 +1,11 @@
#include "psem.h"
#include <semaphore.h>
+#include "time_util.h"
-#define NANO 1000000000
-
-
-static struct timespec psem_calc_abs_timeout(const struct timespec *ts) {
-
- struct timespec res;
- struct timespec timeout;
- if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
- err_exit(errno, "clock_gettime");
-
- res.tv_sec = timeout.tv_sec + ts->tv_sec;
- res.tv_nsec = timeout.tv_nsec + ts->tv_nsec;
- res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO);
- res.tv_nsec = res.tv_nsec % NANO;
- return res;
-}
int psem_timedwait(sem_t *sem, const struct timespec *ts) {
- struct timespec abs_timeout = psem_calc_abs_timeout(ts);
+ struct timespec abs_timeout = TimeUtil::calc_abs_time(ts);
int rv ;
while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) {
diff --git a/src/queue/array_lock_free_queue.h b/src/queue/array_lock_free_queue.h
index 233bc6a..ae1506d 100644
--- a/src/queue/array_lock_free_queue.h
+++ b/src/queue/array_lock_free_queue.h
@@ -1,5 +1,5 @@
-#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
-#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
+#ifndef __ARRAY_LOCK_FREE_QUEUE_H__
+#define __ARRAY_LOCK_FREE_QUEUE_H__
#include "atomic_ops.h"
#include <assert.h> // assert()
#include <sched.h> // sched_yield()
diff --git a/src/queue/array_lock_free_queue2.h b/src/queue/array_lock_free_queue2.h
deleted file mode 100644
index 233bc6a..0000000
--- a/src/queue/array_lock_free_queue2.h
+++ /dev/null
@@ -1,322 +0,0 @@
-#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
-#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
-#include "atomic_ops.h"
-#include <assert.h> // assert()
-#include <sched.h> // sched_yield()
-#include "logger_factory.h"
-#include "mem_pool.h"
-#include "shm_allocator.h"
-
-/// @brief implementation of an array based lock free queue with support for
-/// multiple producers
-/// This class is prevented from being instantiated directly (all members and
-/// methods are private). To instantiate a multiple producers lock free queue
-/// you must use the ArrayLockFreeQueue fachade:
-/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
-
-
-#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
-template <typename ELEM_T, typename Allocator = SHM_Allocator>
-class ArrayLockFreeQueue
-{
- // ArrayLockFreeQueue will be using this' private members
- template <
- typename ELEM_T_,
- typename Allocator_,
- template <typename T, typename AT> class Q_TYPE
- >
- friend class LockFreeQueue;
-
-private:
- /// @brief constructor of the class
- ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
-
- virtual ~ArrayLockFreeQueue();
-
- inline uint32_t size();
-
- inline bool full();
-
- inline bool empty();
-
- bool push(const ELEM_T &a_data);
-
- bool pop(ELEM_T &a_data);
-
- /// @brief calculate the index in the circular array that corresponds
- /// to a particular "count" value
- inline uint32_t countToIndex(uint32_t a_count);
-
- ELEM_T& operator[](unsigned i);
-
-private:
- size_t Q_SIZE;
- /// @brief array to keep the elements
- ELEM_T *m_theQueue;
-
- /// @brief where a new element will be inserted
- uint32_t m_writeIndex;
-
- /// @brief where the next element where be extracted from
- uint32_t m_readIndex;
-
- /// @brief maximum read index for multiple producer queues
- /// If it's not the same as m_writeIndex it means
- /// there are writes pending to be "committed" to the queue, that means,
- /// the place for the data was reserved (the index in the array) but
- /// data is still not in the queue, so the thread trying to read will have
- /// to wait for those other threads to save the data into the queue
- ///
- /// note this is only used for multiple producers
- uint32_t m_maximumReadIndex;
-
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- /// @brief number of elements in the queue
- uint32_t m_count;
-#endif
-
-
-private:
- /// @brief disable copy constructor declaring it private
- ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
-
-};
-
-
-template <typename ELEM_T, typename Allocator>
-ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
- Q_SIZE(qsize),
- m_writeIndex(0), // initialisation is not atomic
- m_readIndex(0), //
- m_maximumReadIndex(0) //
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- ,m_count(0) //
-#endif
-{
- m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
-
-}
-
-template <typename ELEM_T, typename Allocator>
-ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
-{
- // std::cout << "destroy ArrayLockFreeQueue\n";
- Allocator::deallocate(m_theQueue);
-
-}
-
-template <typename ELEM_T, typename Allocator>
-inline
-uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
-{
- // if Q_SIZE is a power of 2 this statement could be also written as
- // return (a_count & (Q_SIZE - 1));
- return (a_count % Q_SIZE);
-}
-
-template <typename ELEM_T, typename Allocator>
-inline
-uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
-{
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
- return m_count;
-#else
-
- uint32_t currentWriteIndex = m_maximumReadIndex;
- uint32_t currentReadIndex = m_readIndex;
-
- // let's think of a scenario where this function returns bogus data
- // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
- // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
- // 2. afterwards this thread is preemted. While this thread is inactive 2
- // elements are inserted and removed from the queue, so m_maximumReadIndex
- // is 5 and m_readIndex 4. Real size is still 1
- // 3. Now the current thread comes back from preemption and reads m_readIndex.
- // currentReadIndex is 4
- // 4. currentReadIndex is bigger than currentWriteIndex, so
- // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
- // it returns that the queue is almost full, when it is almost empty
- //
- if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
- {
- return (currentWriteIndex - currentReadIndex);
- }
- else
- {
- return (Q_SIZE + currentWriteIndex - currentReadIndex);
- }
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-template <typename ELEM_T, typename Allocator>
-inline
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
-{
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
- return (m_count == (Q_SIZE));
-#else
-
- uint32_t currentWriteIndex = m_writeIndex;
- uint32_t currentReadIndex = m_readIndex;
-
- if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
- {
- // the queue is full
- return true;
- }
- else
- {
- // not full!
- return false;
- }
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-template <typename ELEM_T, typename Allocator>
-inline
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
-{
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
- return (m_count == 0);
-#else
-
- if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
- {
- // the queue is full
- return true;
- }
- else
- {
- // not full!
- return false;
- }
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-
-
-
-
-
-template <typename ELEM_T, typename Allocator>
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
-{
- uint32_t currentReadIndex;
- uint32_t currentWriteIndex;
-
- do
- {
-
- currentWriteIndex = m_writeIndex;
- currentReadIndex = m_readIndex;
- #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
- if (m_count == Q_SIZE) {
- return false;
- }
- #else
- if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
- {
- // the queue is full
- return false;
- }
- #endif
-
- } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
-
- // We know now that this index is reserved for us. Use it to save the data
- m_theQueue[countToIndex(currentWriteIndex)] = a_data;
-
- // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
- // inserting in the queue. It might fail if there are more than 1 producer threads because this
- // operation has to be done in the same order as the previous CAS
-
- while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
- {
- // this is a good place to yield the thread in case there are more
- // software threads than hardware processors and you have more
- // than 1 producer thread
- // have a look at sched_yield (POSIX.1b)
- sched_yield();
- }
-
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- AtomicAdd(&m_count, 1);
-#endif
- return true;
-}
-
-
-template <typename ELEM_T, typename Allocator>
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
-{
- uint32_t currentMaximumReadIndex;
- uint32_t currentReadIndex;
-
- do
- {
- // to ensure thread-safety when there is more than 1 producer thread
- // a second index is defined (m_maximumReadIndex)
- currentReadIndex = m_readIndex;
- currentMaximumReadIndex = m_maximumReadIndex;
- #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
- if (m_count == 0) {
- return false;
- }
- #else
- if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
- {
- // the queue is empty or
- // a producer thread has allocate space in the queue but is
- // waiting to commit the data into it
- return false;
- }
- #endif
-
- // retrieve the data from the queue
- a_data = m_theQueue[countToIndex(currentReadIndex)];
-
- // try to perfrom now the CAS operation on the read index. If we succeed
- // a_data already contains what m_readIndex pointed to before we
- // increased it
- if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
- {
- #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- // m_count.fetch_sub(1);
- AtomicSub(&m_count, 1);
- #endif
- return true;
- }
-
- // it failed retrieving the element off the queue. Someone else must
- // have read the element stored at countToIndex(currentReadIndex)
- // before we could perform the CAS operation
-
- } while(1); // keep looping to try again!
-
- // Something went wrong. it shouldn't be possible to reach here
- assert(0);
-
- // Add this return statement to avoid compiler warnings
- return false;
-}
-
-template <typename ELEM_T, typename Allocator>
-ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
-{
- int currentCount = m_count;
- uint32_t currentReadIndex = m_readIndex;
- if (i >= currentCount)
- {
- std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
- std::exit(EXIT_FAILURE);
- }
- return m_theQueue[countToIndex(currentReadIndex+i)];
-}
-
-#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h
new file mode 100644
index 0000000..bb213e8
--- /dev/null
+++ b/src/queue/array_lock_free_sem_queue.h
@@ -0,0 +1,367 @@
+#ifndef __ARRAY_LOCK_FREE_SEM_QUEUE_H__
+#define __ARRAY_LOCK_FREE_SEM_QUEUE_H__
+#include "atomic_ops.h"
+#include <assert.h> // assert()
+#include <sched.h> // sched_yield()
+#include "logger_factory.h"
+#include "mem_pool.h"
+#include "shm_allocator.h"
+#include "futex_sem.h"
+#include "time_util.h"
+
+
+/// @brief implementation of an array based lock free queue with support for
+/// multiple producers
+/// This class is prevented from being instantiated directly (all members and
+/// methods are private). To instantiate a multiple producers lock free queue
+/// you must use the ArrayLockFreeSemQueue fachade:
+/// ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q;
+
+#define LOCK_FREE_QUEUE_TIMEOUT 1
+#define LOCK_FREE_QUEUE_NOWAIT 1 << 1
+
+#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+
+template <typename ELEM_T, typename Allocator = SHM_Allocator>
+class ArrayLockFreeSemQueue
+{
+public:
+ /// @brief constructor of the class
+ ArrayLockFreeSemQueue(size_t qsize = 16);
+
+ virtual ~ArrayLockFreeSemQueue();
+
+ inline uint32_t size();
+
+ inline bool full();
+
+ inline bool empty();
+
+ int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
+
+ int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
+
+ /// @brief calculate the index in the circular array that corresponds
+ /// to a particular "count" value
+ inline uint32_t countToIndex(uint32_t a_count);
+
+ ELEM_T& operator[](unsigned i);
+
+public:
+ void *operator new(size_t size);
+ void operator delete(void *p);
+
+private:
+ size_t Q_SIZE;
+ /// @brief array to keep the elements
+ ELEM_T *m_theQueue;
+
+ /// @brief where a new element will be inserted
+ uint32_t m_writeIndex;
+
+ /// @brief where the next element where be extracted from
+ uint32_t m_readIndex;
+
+ /// @brief maximum read index for multiple producer queues
+ /// If it's not the same as m_writeIndex it means
+ /// there are writes pending to be "committed" to the queue, that means,
+ /// the place for the data was reserved (the index in the array) but
+ /// data is still not in the queue, so the thread trying to read will have
+ /// to wait for those other threads to save the data into the queue
+ ///
+ /// note this is only used for multiple producers
+ uint32_t m_maximumReadIndex;
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ /// @brief number of elements in the queue
+ int m_count;
+#endif
+
+
+ private:
+ /// @brief disable copy constructor declaring it private
+ ArrayLockFreeSemQueue<ELEM_T, Allocator>(const ArrayLockFreeSemQueue<ELEM_T> &a_src);
+
+};
+
+
+template <typename ELEM_T, typename Allocator>
+ArrayLockFreeSemQueue<ELEM_T, Allocator>::ArrayLockFreeSemQueue(size_t qsize):
+ Q_SIZE(qsize),
+ m_writeIndex(0), // initialisation is not atomic
+ m_readIndex(0), //
+ m_maximumReadIndex(0) //
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ ,m_count(0) //
+#endif
+{
+ m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
+
+}
+
+ template <typename ELEM_T, typename Allocator>
+ArrayLockFreeSemQueue<ELEM_T, Allocator>::~ArrayLockFreeSemQueue()
+{
+ // std::cout << "destroy ArrayLockFreeSemQueue\n";
+ Allocator::deallocate(m_theQueue);
+
+}
+
+template <typename ELEM_T, typename Allocator>
+ inline
+uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
+{
+ // if Q_SIZE is a power of 2 this statement could be also written as
+ // return (a_count & (Q_SIZE - 1));
+ return (a_count % Q_SIZE);
+}
+
+template <typename ELEM_T, typename Allocator>
+ inline
+uint32_t ArrayLockFreeSemQueue<ELEM_T, Allocator>::size()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ return m_count;
+#else
+
+ uint32_t currentWriteIndex = m_maximumReadIndex;
+ uint32_t currentReadIndex = m_readIndex;
+
+ // let's think of a scenario where this function returns bogus data
+ // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
+ // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
+ // 2. afterwards this thread is preemted. While this thread is inactive 2
+ // elements are inserted and removed from the queue, so m_maximumReadIndex
+ // is 5 and m_readIndex 4. Real size is still 1
+ // 3. Now the current thread comes back from preemption and reads m_readIndex.
+ // currentReadIndex is 4
+ // 4. currentReadIndex is bigger than currentWriteIndex, so
+ // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+ // it returns that the queue is almost full, when it is almost empty
+ //
+ if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
+ {
+ return (currentWriteIndex - currentReadIndex);
+ }
+ else
+ {
+ return (Q_SIZE + currentWriteIndex - currentReadIndex);
+ }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, typename Allocator>
+ inline
+bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::full()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ return (m_count == (Q_SIZE));
+#else
+
+ uint32_t currentWriteIndex = m_writeIndex;
+ uint32_t currentReadIndex = m_readIndex;
+
+ if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+ {
+ // the queue is full
+ return true;
+ }
+ else
+ {
+ // not full!
+ return false;
+ }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, typename Allocator>
+ inline
+bool ArrayLockFreeSemQueue<ELEM_T, Allocator>::empty()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ return (m_count == 0);
+#else
+
+ if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
+ {
+ // the queue is full
+ return true;
+ }
+ else
+ {
+ // not full!
+ return false;
+ }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+
+
+
+
+
+ template <typename ELEM_T, typename Allocator>
+int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag)
+{
+ uint32_t currentReadIndex;
+ uint32_t currentWriteIndex;
+ int s;
+
+ do
+ {
+ currentWriteIndex = m_writeIndex;
+ currentReadIndex = m_readIndex;
+
+ if (m_count == Q_SIZE) {
+ if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+ return -1;
+ else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+ const struct timespec ts = TimeUtil::trim_time(timeout);
+ s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
+ if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+ return -1;
+ }
+
+ } else {
+ s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
+ if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ return -1;
+ }
+ }
+
+ }
+
+
+ } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+ // We know now that this index is reserved for us. Use it to save the data
+ m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+
+ // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
+ // inserting in the queue. It might fail if there are more than 1 producer threads because this
+ // operation has to be done in the same order as the previous CAS
+
+ while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
+ {
+ // this is a good place to yield the thread in case there are more
+ // software threads than hardware processors and you have more
+ // than 1 producer thread
+ // have a look at sched_yield (POSIX.1b)
+ sched_yield();
+ }
+
+ AtomicAdd(&m_count, 1);
+ s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
+ if (s == -1)
+ err_exit(errno, "futex-FUTEX_WAKE");
+ return 0;
+}
+
+
+ template <typename ELEM_T, typename Allocator>
+int ArrayLockFreeSemQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag)
+{
+ uint32_t currentMaximumReadIndex;
+ uint32_t currentReadIndex;
+
+ int s;
+ do
+ {
+ // to ensure thread-safety when there is more than 1 producer thread
+ // a second index is defined (m_maximumReadIndex)
+ currentReadIndex = m_readIndex;
+ currentMaximumReadIndex = m_maximumReadIndex;
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ if (m_count == 0) {
+
+ if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+ return -1;
+ else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+ const struct timespec ts = TimeUtil::trim_time(timeout);
+ s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
+ if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+ return -1;
+ }
+
+ } else {
+ s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
+ if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ return -1;
+ }
+ }
+ }
+#else
+ if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
+ {
+ // the queue is empty or
+ // a producer thread has allocate space in the queue but is
+ // waiting to commit the data into it
+ return -1;
+ }
+#endif
+
+ // retrieve the data from the queue
+ a_data = m_theQueue[countToIndex(currentReadIndex)];
+
+ // try to perfrom now the CAS operation on the read index. If we succeed
+ // a_data already contains what m_readIndex pointed to before we
+ // increased it
+ if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
+ {
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ // m_count.fetch_sub(1);
+ AtomicSub(&m_count, 1);
+#endif
+
+ s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
+ if (s == -1)
+ err_exit(errno, "futex-FUTEX_WAKE");
+ return 0;
+ }
+
+ // it failed retrieving the element off the queue. Someone else must
+ // have read the element stored at countToIndex(currentReadIndex)
+ // before we could perform the CAS operation
+
+ } while(1); // keep looping to try again!
+
+ // Something went wrong. it shouldn't be possible to reach here
+ assert(0);
+
+ // Add this return statement to avoid compiler warnings
+ return -1;
+}
+
+ template <typename ELEM_T, typename Allocator>
+ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i)
+{
+ int currentCount = m_count;
+ uint32_t currentReadIndex = m_readIndex;
+ if (i >= currentCount)
+ {
+ std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
+ std::exit(EXIT_FAILURE);
+ }
+ return m_theQueue[countToIndex(currentReadIndex+i)];
+}
+
+
+
+template <typename ELEM_T, typename Allocator>
+void * ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator new(size_t size){
+ return Allocator::allocate(size);
+}
+
+template <typename ELEM_T, typename Allocator>
+void ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator delete(void *p) {
+ return Allocator::deallocate(p);
+}
+
+#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index b7dfd9f..01e597c 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -221,8 +221,7 @@
{
LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
if (psem_wait(&slots) == -1) {
- err_msg(errno, "LockFreeQueue push");
- return errno;
+ return -1;
}
if ( m_qImpl.push(a_data) ) {
@@ -241,13 +240,7 @@
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
if (psem_trywait(&slots) == -1) {
- if (errno == EAGAIN)
- return EAGAIN;
- else {
- err_msg(errno, "LockFreeQueue push_nowait");
- return errno;
- }
-
+ return -1;
}
if ( m_qImpl.push(a_data)) {
@@ -265,15 +258,8 @@
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
{
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");
- int rv;
if ( psem_timedwait(&slots, ts) == -1) {
-
- if(errno == ETIMEDOUT)
- return EBUS_TIMEOUT;
- else {
- LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
- return errno;
- }
+ return -1;
}
if (m_qImpl.push(a_data)){
@@ -297,8 +283,7 @@
LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
if (psem_wait(&items) == -1) {
- LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop");
- return errno;
+ return -1;
}
if (m_qImpl.pop(a_data)) {
@@ -316,12 +301,7 @@
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
if (psem_trywait(&items) == -1) {
- if (errno == EAGAIN)
- return errno;
- else {
- LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait");
- return errno;
- }
+ return -1;
}
if (m_qImpl.pop(a_data)) {
@@ -339,14 +319,7 @@
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
if (psem_timedwait(&items, ts) == -1) {
- if (errno == ETIMEDOUT) {
- return EBUS_TIMEOUT;
- }
-
- else {
- LoggerFactory::getLogger()->error(errno, "3 LockFreeQueue pop_timeout %d", errno);
- return errno;
- }
+ return -1;
}
if (m_qImpl.pop(a_data)) {
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 7d98eaa..5d2d9b6 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -6,12 +6,13 @@
#define __SHM_QUEUE_H__
#include "hashtable.h"
-#include "lock_free_queue.h"
+
#include "logger_factory.h"
#include "sem_util.h"
#include "shm_allocator.h"
#include "usg_common.h"
-
+#include "array_lock_free_sem_queue.h"
+#include "bus_error.h"
template <typename ELEM_T> class SHMQueue {
@@ -20,7 +21,7 @@
public:
/// @brief constructor of the class
- SHMQueue(int key = 0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
+ SHMQueue(int key = 0, size_t qsize = 16);
~SHMQueue();
@@ -49,7 +50,8 @@
protected:
/// @brief the actual queue-> methods are forwarded into the real
/// implementation
- LockFreeQueue<ELEM_T, SHM_Allocator> *queue;
+
+ ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *queue;
private:
/// @brief disable copy constructor declaring it private
@@ -62,7 +64,7 @@
hashtable_t *hashtable = mm_get_hashtable();
std::set<int> *keyset = hashtable_keyset(hashtable);
std::set<int>::iterator keyItr;
- LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
+ ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
bool found;
size_t count = 0;
for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
@@ -75,7 +77,7 @@
}
if (!found) {
// 閿�姣佸叡浜唴瀛樼殑queue
- mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
+ mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
delete mqueue;
hashtable_remove(hashtable, *keyItr);
count++;
@@ -89,11 +91,11 @@
template <typename ELEM_T>
size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) {
hashtable_t *hashtable = mm_get_hashtable();
- LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
+ ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
size_t count = 0;
for(int i = 0; i< length; i++) {
// 閿�姣佸叡浜唴瀛樼殑queue
- mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
+ mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
delete mqueue;
hashtable_remove(hashtable, keys[i]);
count++;
@@ -111,49 +113,22 @@
SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) {
hashtable_t *hashtable = mm_get_hashtable();
- queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
+ queue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
if (queue == NULL || (void *)queue == (void *)1) {
- queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
+ queue = new ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator>(qsize);
hashtable_put(hashtable, key, (void *)queue);
}
- queue->reference++;
+ // queue->reference++;
// LoggerFactory::getLogger()->debug("SHMQueue constructor reference===%d", queue->reference.load());
}
template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() {
- if(queue == NULL) {
- // queue宸茬粡閿�姣�
- return;
- }
-
- sem_wait(&(queue->mutex));
- queue->reference--;
- // LoggerFactory::getLogger()->debug("SHMQueue destructor reference===%d",
- if (queue->reference.load() == 0) {
- delete queue;
- queue = NULL;
- hashtable_t *hashtable = mm_get_hashtable();
- hashtable_remove(hashtable, KEY);
- // 姝ゆ椂queue宸茬粡閿�姣侊紝鏃犻渶 sem_post(&(queue->mutex))
- // printf("SHMQueue destructor delete queue\n");
- } else {
- sem_post(&(queue->mutex));
- }
-
-}
-
-template <typename ELEM_T> void SHMQueue<ELEM_T>::force_destroy() {
- if(queue == NULL) {
- // queue宸茬粡閿�姣�
- return;
- }
-
- SemUtil::dec(queue->mutex);
+ LoggerFactory::getLogger()->debug("SHMQueue destroy");
delete queue;
queue = NULL;
hashtable_t *hashtable = mm_get_hashtable();
hashtable_remove(hashtable, KEY);
- // 姝ゆ椂queue宸茬粡閿�姣侊紝鏃犻渶 SemUtil::inc(queue->mutex)
+
}
template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::size() {
@@ -170,36 +145,85 @@
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) {
- return queue->push(a_data);
+ int rv = queue->push(a_data);
+ if(rv == -1) {
+ return errno;
+ } else {
+ return 0;
+ }
}
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
- return queue->push_nowait(a_data);
+ int rv = queue->push(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
+ if(rv == -1) {
+ if (errno == EAGAIN)
+ return EAGAIN;
+ else {
+ err_msg(errno, "LockFreeQueue push_nowait");
+ return errno;
+ }
+ }
+ return 0;
}
template <typename ELEM_T>
-inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data,
- const struct timespec *timeout) {
+inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) {
- return queue->push_timeout(a_data, timeout);
+ int rv = queue->push(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
+ if(rv == -1) {
+ if(errno == ETIMEDOUT)
+ return EBUS_TIMEOUT;
+ else {
+ LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
+ return errno;
+ }
+ }
+ return 0;
}
template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) {
// printf("SHMQueue pop before\n");
int rv = queue->pop(a_data);
// printf("SHMQueue after before\n");
- return rv;
+ if(rv == -1) {
+ return errno;
+ } else {
+ return 0;
+ }
}
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
- return queue->pop_nowait(a_data);
+ int rv = queue->pop(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
+
+ if(rv == -1) {
+ if (errno == EAGAIN)
+ return errno;
+ else {
+ LoggerFactory::getLogger()->error(errno, " SHMQueue pop_nowait");
+ return errno;
+ }
+ }
+ return 0;
+
}
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) {
- return queue->pop_timeout(a_data, timeout);
+
+ int rv;
+ rv = queue->pop(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
+ if(rv == -1) {
+ if (errno == ETIMEDOUT) {
+ return EBUS_TIMEOUT;
+ } else {
+ LoggerFactory::getLogger()->error(errno, " SHMQueue pop_timeout");
+ return errno;
+ }
+ }
+ return 0;
+
}
template <typename ELEM_T>
@@ -207,4 +231,7 @@
return queue->operator[](i);
}
+
+
+
#endif
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 534202d..76e906f 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -383,10 +383,8 @@
if (rv == 0) {
// printf("shm_sendto push after\n");
- delete remoteQueue;
return 0;
} else {
- delete remoteQueue;
mm_free(dest.buf);
if(rv > EBUS_BASE) {
// bus_errno = EBUS_TIMEOUT;
@@ -725,10 +723,7 @@
socket->queue = NULL;
}
- if (socket->remoteQueue != NULL) {
- delete socket->remoteQueue;
- socket->remoteQueue = NULL;
- }
+
if (socket->messageQueue != NULL) {
delete socket->messageQueue;
@@ -747,7 +742,6 @@
client_socket = iter->second;
client_socket->remoteQueue->push_timeout(close_msg, &timeout);
- delete client_socket->remoteQueue;
client_socket->remoteQueue = NULL;
delete client_socket->messageQueue;
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index 0917d00..abc8e20 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -4,6 +4,7 @@
#include "usg_common.h"
#include "usg_typedef.h"
#include "shm_queue.h"
+#include "lock_free_queue.h"
enum shm_socket_flag_t
{
diff --git a/src/time_util.cpp b/src/time_util.cpp
new file mode 100644
index 0000000..f00b4f4
--- /dev/null
+++ b/src/time_util.cpp
@@ -0,0 +1,26 @@
+#include "time_util.h"
+
+#define NANO 1000000000
+
+
+struct timespec TimeUtil::calc_abs_time(const struct timespec *ts) {
+
+ struct timespec res;
+ struct timespec timeout;
+ if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
+ err_exit(errno, "clock_gettime");
+
+ res.tv_sec = timeout.tv_sec + ts->tv_sec;
+ res.tv_nsec = timeout.tv_nsec + ts->tv_nsec;
+ res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO);
+ res.tv_nsec = res.tv_nsec % NANO;
+ return res;
+}
+
+struct timespec TimeUtil::trim_time(const struct timespec *ts) {
+
+ struct timespec res;
+ res.tv_sec = ts->tv_sec + floor(ts->tv_nsec / NANO);
+ res.tv_nsec = ts->tv_nsec % NANO;
+ return res;
+}
\ No newline at end of file
diff --git a/src/time_util.h b/src/time_util.h
new file mode 100644
index 0000000..1f23d39
--- /dev/null
+++ b/src/time_util.h
@@ -0,0 +1,14 @@
+#ifndef _TIMEUTIL_H_
+#define _TIMEUTIL_H_
+#include "usg_common.h"
+class TimeUtil {
+public:
+ // 璁$畻褰撳墠鏃堕棿+ts鐨勭粷瀵规椂闂�
+ static struct timespec calc_abs_time(const struct timespec *ts);
+
+ // 濡傛灉绾崇澶т簬10e9锛屽悜绉掕繘浣�
+ static struct timespec trim_time(const struct timespec *ts) ;
+};
+
+
+#endif
\ No newline at end of file
diff --git a/test/futex_demo.cpp b/test/futex_demo.cpp
index d6887e8..2899862 100644
--- a/test/futex_demo.cpp
+++ b/test/futex_demo.cpp
@@ -19,11 +19,12 @@
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/time.h>
-#include "usg_common.h"
#include <sys/mman.h>
#include <sys/stat.h> /* For mode constants */
#include <fcntl.h> /* For O_* constants */
+#include "usg_common.h"
+
#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \
} while (0)
--
Gitblit v1.8.0