From 4c73fd7179e92bee9cccb65e46823b00f568acb3 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 22 一月 2021 16:57:34 +0800
Subject: [PATCH] tmp
---
src/queue/lock_free_queue.h | 404 +++++++----------
test_net_socket/test_net_mod_socket.cpp | 5
src/socket/shm_socket.h | 8
build.sh | 4
CMakeLists.txt | 6
src/net/net_mod_server_socket.cpp | 2
src/net/net_mod_socket.cpp | 8
src/socket/shm_socket.cpp | 16
src/bus_def.h | 7
test_socket/CMakeLists.txt | 11
/dev/null | 131 -----
src/queue/array_lock_free_queue.h | 487 ++++++++++-----------
src/queue/shm_queue.h | 23
test_socket/bus_test.cpp | 116 +++++
src/net/net_mod_socket_wrapper.cpp | 7
src/queue/array_lock_free_sem_queue.h | 41 +
test_net_socket/net_mod_socket.sh | 4
src/CMakeLists.txt | 11
src/socket/shm_mod_socket.cpp | 14
19 files changed, 625 insertions(+), 680 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 8310bfa..3b1e4b4 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -13,7 +13,10 @@
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}")
# set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}")
-option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
+if (CMAKE_BUILD_TYPE EQUAL "Release")
+#option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
+endif()
+
option(BUILD_DOC "Build doc" OFF)
@@ -29,4 +32,5 @@
add_subdirectory(${PROJECT_SOURCE_DIR}/src)
add_subdirectory(${PROJECT_SOURCE_DIR}/test)
add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket)
+ add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
endif()
diff --git a/build.sh b/build.sh
index eab77c7..6bfd753 100755
--- a/build.sh
+++ b/build.sh
@@ -2,6 +2,7 @@
BUILD_TYPE="Debug"
BUILD_DOC="OFF"
+BUILD_SHARED_LIBS="OFF"
function usage() {
echo "build.sh [release | debug | doc]"
@@ -10,6 +11,7 @@
case ${1} in
"release")
BUILD_TYPE="Release"
+ BUILD_SHARED_LIBS="ON"
;;
"debug")
@@ -48,7 +50,7 @@
# -DBUILD_SHARED_LIBS=ON
# -DCMAKE_INSTALL_PREFIX=$(pwd/../dest)
# -DQCA_MAN_INSTALL_DIR:PATH=/usr/share/man
-cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DBUILD_SHARED_LIBS=ON \
+cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DBUILD_SHARED_LIBS=${BUILD_SHARED_LIBS} \
-DBUILD_DOC=${BUILD_DOC} -DSUPPORT_RDMA=OFF ..
cmake --build .
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index fe597d9..f3698d9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -1,5 +1,3 @@
-
-
# should we use our own math functions
option(SUPPORT_RDMA "If support rdma" OFF)
@@ -16,9 +14,9 @@
./socket/shm_mod_socket.cpp
./time_util.cpp
./psem.cpp
-./svsem.cpp
./bus_error.cpp
./futex_sem.cpp
+./svsem.cpp
./net/net_conn_pool.cpp
./net/net_mod_server_socket_wrapper.cpp
./net/net_mod_socket_wrapper.cpp
@@ -28,7 +26,6 @@
./shm/shm_mm_wrapper.cpp
./shm/mm.cpp
./shm/hashtable.cpp
-
)
@@ -62,13 +59,14 @@
./time_util.h
./futex_sem.h
./bus_error.h
-./svsem.h
+./bus_def.h
./logger_factory.h
./queue/linked_lock_free_queue.h
-./queue/array_lock_free_queue2.h
./queue/array_lock_free_queue.h
./queue/shm_queue.h
+./queue/array_lock_free_sem_queue.h
./queue/lock_free_queue.h
+./svsem.h
./net/net_conn_pool.h
./net/net_mod_socket.h
./net/net_mod_server_socket_wrapper.h
@@ -82,6 +80,7 @@
./shm/shm_allocator.h
+
DESTINATION include)
install(FILES "${PROJECT_BINARY_DIR}/src/bus_config.h"
diff --git a/src/bus_def.h b/src/bus_def.h
new file mode 100644
index 0000000..78a7eb9
--- /dev/null
+++ b/src/bus_def.h
@@ -0,0 +1,7 @@
+#ifndef _BUS_DEF_H_
+#define _BUS_DEF_H_
+
+#define BUS_TIMEOUT_FLAG 1
+#define BUS_NOWAIT_FLAG 1 << 1
+
+#endif
\ No newline at end of file
diff --git a/src/net/net_mod_server_socket.cpp b/src/net/net_mod_server_socket.cpp
index 8324d6f..432defb 100644
--- a/src/net/net_mod_server_socket.cpp
+++ b/src/net/net_mod_server_socket.cpp
@@ -54,7 +54,7 @@
sprintf(portstr, "%d", port);
listenfd = open_listenfd(portstr);
if(listenfd < 0) {
- LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start . errno=%d ", errno);
+ LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start. port = %d ", port);
return -1;
}
init_pool(listenfd);
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index 73be131..3ef15b9 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -513,10 +513,12 @@
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
int NetModSocket::recvfrom(void **buf, int *size, int *key) {
+
+ logger->debug(" %d NetModSocket::recvfrom before", get_key());
int rv = shmModSocket.recvfrom(buf, size, key);
if(rv == 0) {
- logger->debug("NetModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
+ logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key);
return 0;
}
@@ -533,7 +535,7 @@
struct timespec timeout = {sec, nsec};
int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
if(rv == 0) {
- logger->debug("NetModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
+ logger->debug("NetModSocket::recvfrom_timeout: %d recvfrom %d success.\n", get_key(), *key);
return 0;
}
@@ -549,7 +551,7 @@
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
int rv = shmModSocket.recvfrom_nowait(buf, size, key);
if(rv == 0) {
- logger->debug("NetModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
+ logger->debug("NetModSocket::recvfrom_nowait: %d recvfrom %d success.\n", get_key(), *key);
return 0;
}
diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp
index 83161bc..e7bf302 100644
--- a/src/net/net_mod_socket_wrapper.cpp
+++ b/src/net/net_mod_socket_wrapper.cpp
@@ -67,8 +67,13 @@
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){
+ int rv;
NetModSocket *sockt = (NetModSocket *)_socket;
- return sockt->recvfrom(buf, size, key);
+
+ logger->debug(" %d net_mod_socket_recvfrom before", net_mod_socket_get_key(_socket));
+ rv = sockt->recvfrom(buf, size, key);
+ logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv);
+ return rv;
}
// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
diff --git a/src/queue/array_lock_free_queue.h b/src/queue/array_lock_free_queue.h
index ae1506d..a03b33e 100644
--- a/src/queue/array_lock_free_queue.h
+++ b/src/queue/array_lock_free_queue.h
@@ -1,5 +1,6 @@
#ifndef __ARRAY_LOCK_FREE_QUEUE_H__
#define __ARRAY_LOCK_FREE_QUEUE_H__
+
#include "atomic_ops.h"
#include <assert.h> // assert()
#include <sched.h> // sched_yield()
@@ -17,306 +18,290 @@
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-template <typename ELEM_T, typename Allocator = SHM_Allocator>
-class ArrayLockFreeQueue
-{
- // ArrayLockFreeQueue will be using this' private members
- template <
- typename ELEM_T_,
- typename Allocator_,
- template <typename T, typename AT> class Q_TYPE
- >
- friend class LockFreeQueue;
+template<typename ELEM_T, typename Allocator = SHM_Allocator>
+class ArrayLockFreeQueue {
+ // ArrayLockFreeQueue will be using this' private members
+ template<
+ typename ELEM_T_,
+ typename Allocator_,
+ template<typename T, typename AT> class Q_TYPE
+ >
+ friend
+ class LockFreeQueue;
private:
- /// @brief constructor of the class
- ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
-
- virtual ~ArrayLockFreeQueue();
-
- inline uint32_t size();
-
- inline bool full();
+ /// @brief constructor of the class
+ ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
- inline bool empty();
-
- bool push(const ELEM_T &a_data);
-
- bool pop(ELEM_T &a_data);
-
- /// @brief calculate the index in the circular array that corresponds
- /// to a particular "count" value
- inline uint32_t countToIndex(uint32_t a_count);
+ virtual ~ArrayLockFreeQueue();
- ELEM_T& operator[](unsigned i);
-
-private:
- size_t Q_SIZE;
- /// @brief array to keep the elements
- ELEM_T *m_theQueue;
+ inline uint32_t size();
- /// @brief where a new element will be inserted
- uint32_t m_writeIndex;
+ inline bool full();
- /// @brief where the next element where be extracted from
- uint32_t m_readIndex;
-
- /// @brief maximum read index for multiple producer queues
- /// If it's not the same as m_writeIndex it means
- /// there are writes pending to be "committed" to the queue, that means,
- /// the place for the data was reserved (the index in the array) but
- /// data is still not in the queue, so the thread trying to read will have
- /// to wait for those other threads to save the data into the queue
- ///
- /// note this is only used for multiple producers
- uint32_t m_maximumReadIndex;
+ inline bool empty();
+
+ bool push(const ELEM_T &a_data);
+
+ bool pop(ELEM_T &a_data);
+
+ /// @brief calculate the index in the circular array that corresponds
+ /// to a particular "count" value
+ inline uint32_t countToIndex(uint32_t a_count);
+
+ ELEM_T &operator[](unsigned i);
+
+private:
+ size_t Q_SIZE;
+ /// @brief array to keep the elements
+ ELEM_T *m_theQueue;
+
+ /// @brief where a new element will be inserted
+ uint32_t m_writeIndex;
+
+ /// @brief where the next element where be extracted from
+ uint32_t m_readIndex;
+
+ /// @brief maximum read index for multiple producer queues
+ /// If it's not the same as m_writeIndex it means
+ /// there are writes pending to be "committed" to the queue, that means,
+ /// the place for the data was reserved (the index in the array) but
+ /// data is still not in the queue, so the thread trying to read will have
+ /// to wait for those other threads to save the data into the queue
+ ///
+ /// note this is only used for multiple producers
+ uint32_t m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- /// @brief number of elements in the queue
- uint32_t m_count;
+ /// @brief number of elements in the queue
+ uint32_t m_count;
#endif
-
-
+
+
private:
- /// @brief disable copy constructor declaring it private
- ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
+ /// @brief disable copy constructor declaring it private
+ ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
};
-template <typename ELEM_T, typename Allocator>
+template<typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
- Q_SIZE(qsize),
- m_writeIndex(0), // initialisation is not atomic
- m_readIndex(0), //
- m_maximumReadIndex(0) //
+ Q_SIZE(qsize),
+ m_writeIndex(0), // initialisation is not atomic
+ m_readIndex(0), //
+ m_maximumReadIndex(0) //
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- ,m_count(0) //
+ , m_count(0) //
#endif
{
- m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
+ m_theQueue = (ELEM_T *) Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
}
-template <typename ELEM_T, typename Allocator>
-ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
-{
- // std::cout << "destroy ArrayLockFreeQueue\n";
- Allocator::deallocate(m_theQueue);
-
+template<typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() {
+ // std::cout << "destroy ArrayLockFreeQueue\n";
+ Allocator::deallocate(m_theQueue);
+
}
-template <typename ELEM_T, typename Allocator>
-inline
-uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
-{
- // if Q_SIZE is a power of 2 this statement could be also written as
- // return (a_count & (Q_SIZE - 1));
- return (a_count % Q_SIZE);
+template<typename ELEM_T, typename Allocator>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) {
+ // if Q_SIZE is a power of 2 this statement could be also written as
+ // return (a_count & (Q_SIZE - 1));
+ return (a_count % Q_SIZE);
}
-template <typename ELEM_T, typename Allocator>
-inline
-uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
-{
+template<typename ELEM_T, typename Allocator>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- return m_count;
+ return m_count;
#else
- uint32_t currentWriteIndex = m_maximumReadIndex;
- uint32_t currentReadIndex = m_readIndex;
+ uint32_t currentWriteIndex = m_maximumReadIndex;
+ uint32_t currentReadIndex = m_readIndex;
- // let's think of a scenario where this function returns bogus data
- // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
- // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
- // 2. afterwards this thread is preemted. While this thread is inactive 2
- // elements are inserted and removed from the queue, so m_maximumReadIndex
- // is 5 and m_readIndex 4. Real size is still 1
- // 3. Now the current thread comes back from preemption and reads m_readIndex.
- // currentReadIndex is 4
- // 4. currentReadIndex is bigger than currentWriteIndex, so
- // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
- // it returns that the queue is almost full, when it is almost empty
- //
- if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
- {
- return (currentWriteIndex - currentReadIndex);
- }
- else
- {
- return (Q_SIZE + currentWriteIndex - currentReadIndex);
- }
+ // let's think of a scenario where this function returns bogus data
+ // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
+ // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
+ // 2. afterwards this thread is preemted. While this thread is inactive 2
+ // elements are inserted and removed from the queue, so m_maximumReadIndex
+ // is 5 and m_readIndex 4. Real size is still 1
+ // 3. Now the current thread comes back from preemption and reads m_readIndex.
+ // currentReadIndex is 4
+ // 4. currentReadIndex is bigger than currentWriteIndex, so
+ // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+ // it returns that the queue is almost full, when it is almost empty
+ //
+ if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
+ {
+ return (currentWriteIndex - currentReadIndex);
+ }
+ else
+ {
+ return (Q_SIZE + currentWriteIndex - currentReadIndex);
+ }
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
-template <typename ELEM_T, typename Allocator>
-inline
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
-{
+template<typename ELEM_T, typename Allocator>
+inline
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- return (m_count == (Q_SIZE));
+ return (m_count == (Q_SIZE));
#else
- uint32_t currentWriteIndex = m_writeIndex;
- uint32_t currentReadIndex = m_readIndex;
-
+ uint32_t currentWriteIndex = m_writeIndex;
+ uint32_t currentReadIndex = m_readIndex;
+
+ if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+ {
+ // the queue is full
+ return true;
+ }
+ else
+ {
+ // not full!
+ return false;
+ }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template<typename ELEM_T, typename Allocator>
+inline
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() {
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ return (m_count == 0);
+#else
+
+ if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
+ {
+ // the queue is full
+ return true;
+ }
+ else
+ {
+ // not full!
+ return false;
+ }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+
+template<typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) {
+ uint32_t currentReadIndex;
+ uint32_t currentWriteIndex;
+
+ do {
+
+ currentWriteIndex = m_writeIndex;
+ currentReadIndex = m_readIndex;
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ if (m_count == Q_SIZE) {
+ return false;
+ }
+#else
if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
{
// the queue is full
- return true;
- }
- else
- {
- // not full!
return false;
}
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-template <typename ELEM_T, typename Allocator>
-inline
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
-{
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
- return (m_count == 0);
-#else
-
- if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
- {
- // the queue is full
- return true;
- }
- else
- {
- // not full!
- return false;
- }
-#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-
-
-
-
-
-template <typename ELEM_T, typename Allocator>
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
-{
- uint32_t currentReadIndex;
- uint32_t currentWriteIndex;
-
- do
- {
-
- currentWriteIndex = m_writeIndex;
- currentReadIndex = m_readIndex;
- #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
- if (m_count == Q_SIZE) {
- return false;
- }
- #else
- if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
- {
- // the queue is full
- return false;
- }
- #endif
-
- } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
-
- // We know now that this index is reserved for us. Use it to save the data
- m_theQueue[countToIndex(currentWriteIndex)] = a_data;
-
- // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
- // inserting in the queue. It might fail if there are more than 1 producer threads because this
- // operation has to be done in the same order as the previous CAS
-
- while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
- {
- // this is a good place to yield the thread in case there are more
- // software threads than hardware processors and you have more
- // than 1 producer thread
- // have a look at sched_yield (POSIX.1b)
- sched_yield();
- }
-
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- AtomicAdd(&m_count, 1);
#endif
- return true;
+
+ } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+ // We know now that this index is reserved for us. Use it to save the data
+ m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+
+ // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
+ // inserting in the queue. It might fail if there are more than 1 producer threads because this
+ // operation has to be done in the same order as the previous CAS
+
+ while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
+ // this is a good place to yield the thread in case there are more
+ // software threads than hardware processors and you have more
+ // than 1 producer thread
+ // have a look at sched_yield (POSIX.1b)
+ sched_yield();
+ }
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ AtomicAdd(&m_count, 1);
+#endif
+ return true;
}
-template <typename ELEM_T, typename Allocator>
-bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
-{
- uint32_t currentMaximumReadIndex;
- uint32_t currentReadIndex;
+template<typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) {
+ uint32_t currentMaximumReadIndex;
+ uint32_t currentReadIndex;
- do
- {
- // to ensure thread-safety when there is more than 1 producer thread
- // a second index is defined (m_maximumReadIndex)
- currentReadIndex = m_readIndex;
- currentMaximumReadIndex = m_maximumReadIndex;
- #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ do {
+ // to ensure thread-safety when there is more than 1 producer thread
+ // a second index is defined (m_maximumReadIndex)
+ currentReadIndex = m_readIndex;
+ currentMaximumReadIndex = m_maximumReadIndex;
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- if (m_count == 0) {
- return false;
- }
- #else
- if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
- {
- // the queue is empty or
- // a producer thread has allocate space in the queue but is
- // waiting to commit the data into it
- return false;
- }
- #endif
-
- // retrieve the data from the queue
- a_data = m_theQueue[countToIndex(currentReadIndex)];
-
- // try to perfrom now the CAS operation on the read index. If we succeed
- // a_data already contains what m_readIndex pointed to before we
- // increased it
- if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
- {
- #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- // m_count.fetch_sub(1);
- AtomicSub(&m_count, 1);
- #endif
- return true;
- }
-
- // it failed retrieving the element off the queue. Someone else must
- // have read the element stored at countToIndex(currentReadIndex)
- // before we could perform the CAS operation
-
- } while(1); // keep looping to try again!
-
- // Something went wrong. it shouldn't be possible to reach here
- assert(0);
-
- // Add this return statement to avoid compiler warnings
- return false;
-}
-
-template <typename ELEM_T, typename Allocator>
-ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
-{
- int currentCount = m_count;
- uint32_t currentReadIndex = m_readIndex;
- if (i >= currentCount)
- {
- std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
- std::exit(EXIT_FAILURE);
+ if (m_count == 0) {
+ return false;
}
- return m_theQueue[countToIndex(currentReadIndex+i)];
+#else
+ if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
+ {
+ // the queue is empty or
+ // a producer thread has allocate space in the queue but is
+ // waiting to commit the data into it
+ return false;
+ }
+#endif
+
+ // retrieve the data from the queue
+ a_data = m_theQueue[countToIndex(currentReadIndex)];
+
+ // try to perfrom now the CAS operation on the read index. If we succeed
+ // a_data already contains what m_readIndex pointed to before we
+ // increased it
+ if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ // m_count.fetch_sub(1);
+ AtomicSub(&m_count, 1);
+#endif
+ return true;
+ }
+
+ // it failed retrieving the element off the queue. Someone else must
+ // have read the element stored at countToIndex(currentReadIndex)
+ // before we could perform the CAS operation
+
+ } while (1); // keep looping to try again!
+
+ // Something went wrong. it shouldn't be possible to reach here
+ assert(0);
+
+ // Add this return statement to avoid compiler warnings
+ return false;
+}
+
+template<typename ELEM_T, typename Allocator>
+ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) {
+ int currentCount = m_count;
+ uint32_t currentReadIndex = m_readIndex;
+ if (i >= currentCount) {
+ std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i
+ << " is out of range\n";
+ std::exit(EXIT_FAILURE);
+ }
+ return m_theQueue[countToIndex(currentReadIndex + i)];
}
#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h
index 69630d9..a3b677c 100644
--- a/src/queue/array_lock_free_sem_queue.h
+++ b/src/queue/array_lock_free_sem_queue.h
@@ -8,7 +8,7 @@
#include "shm_allocator.h"
#include "futex_sem.h"
#include "time_util.h"
-
+#include "bus_def.h"
/// @brief implementation of an array based lock free queue with support for
/// multiple producers
@@ -17,8 +17,7 @@
/// you must use the ArrayLockFreeSemQueue fachade:
/// ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q;
-#define LOCK_FREE_QUEUE_TIMEOUT 1
-#define LOCK_FREE_QUEUE_NOWAIT 1 << 1
+
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -207,15 +206,17 @@
uint32_t currentWriteIndex;
int s;
+ // sigset_t mask_all, pre;
+ // sigfillset(&mask_all);
do
{
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
if (m_count == Q_SIZE) {
- if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+ if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
return -1;
- else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+ else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
const struct timespec ts = TimeUtil::trim_time(timeout);
s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
@@ -235,9 +236,9 @@
if (currentReadIndex == currentWriteIndex - Q_SIZE + 1 )
{
// the queue is full
- if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+ if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
return -1;
- else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+ else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
const struct timespec ts = TimeUtil::trim_time(timeout);
s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
@@ -260,10 +261,11 @@
// We know now that this index is reserved for us. Use it to save the data
m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+ // sigprocmask(SIG_BLOCK, &mask_all, &pre);
+
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
// inserting in the queue. It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
-
while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
{
// this is a good place to yield the thread in case there are more
@@ -283,6 +285,7 @@
err_exit(errno, "futex-FUTEX_WAKE");
#endif
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
return 0;
}
@@ -293,6 +296,11 @@
uint32_t currentMaximumReadIndex;
uint32_t currentReadIndex;
int s;
+
+ // sigset_t mask_all, pre;
+ // sigfillset(&mask_all);
+
+ // sigprocmask(SIG_BLOCK, &mask_all, &pre);
do
{
@@ -305,19 +313,23 @@
if (m_count == 0) {
- if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+ if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
return -1;
- else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+ }
+ else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
const struct timespec ts = TimeUtil::trim_time(timeout);
s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
// err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
return -1;
}
} else {
s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
return -1;
}
}
@@ -330,19 +342,23 @@
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
- if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+ if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
return -1;
- else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+ }
+ else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
const struct timespec ts = TimeUtil::trim_time(timeout);
s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
// err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
return -1;
}
} else {
s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
return -1;
}
}
@@ -367,6 +383,7 @@
err_exit(errno, "futex-FUTEX_WAKE");
#endif
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
return 0;
}
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 01e597c..9245d3e 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -12,20 +12,20 @@
#include "shm_allocator.h"
#include "psem.h"
#include "bus_error.h"
-
+#include "bus_def.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
// static Logger *logger = LoggerFactory::getLogger();
-// define this macro if calls to "size" must return the real size of the
-// queue. If it is undefined that function will try to take a snapshot of
+// define this macro if calls to "size" must return the real size of the
+// queue. If it is undefined that function will try to take a snapshot of
// the queue, but returned value might be bogus
// forward declarations for default template values
//
-template <typename ELEM_T, typename Allocator>
+template<typename ELEM_T, typename Allocator>
class ArrayLockFreeQueue;
// template <typename ELEM_T>
@@ -33,9 +33,9 @@
/// @brief Lock-free queue based on a circular array
-/// No allocation of extra memory for the nodes handling is needed, but it has
-/// to add extra overhead (extra CAS operation) when inserting to ensure the
-/// thread-safety of the queue when the queue type is not
+/// No allocation of extra memory for the nodes handling is needed, but it has
+/// to add extra overhead (extra CAS operation) when inserting to ensure the
+/// thread-safety of the queue when the queue type is not
/// ArrayLockFreeQueueSingleProducer.
///
/// examples of instantiation:
@@ -50,113 +50,109 @@
///
/// ELEM_T represents the type of elementes pushed and popped from the queue
/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
-/// This number should be a power of 2 to ensure
-/// indexes in the circular queue keep stable when the uint32_t
+/// This number should be a power of 2 to ensure
+/// indexes in the circular queue keep stable when the uint32_t
/// variable that holds the current position rolls over from FFFFFFFF
/// to 0. For instance
-/// 2 -> 0x02
+/// 2 -> 0x02
/// 4 -> 0x04
/// 8 -> 0x08
/// 16 -> 0x10
-/// (...)
+/// (...)
/// 1024 -> 0x400
/// 2048 -> 0x800
///
/// if queue size is not defined as requested, let's say, for
/// instance 100, when current position is FFFFFFFF (4,294,967,295)
-/// index in the circular array is 4,294,967,295 % 100 = 95.
-/// When that value is incremented it will be set to 0, that is the
+/// index in the circular array is 4,294,967,295 % 100 = 95.
+/// When that value is incremented it will be set to 0, that is the
/// last 4 elements of the queue are not used when the counter rolls
/// over to 0
-/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
+/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
/// ArrayLockFreeQueue are supported (single producer
/// by default)
-template <
- typename ELEM_T,
- typename Allocator = SHM_Allocator,
- template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
- >
-class LockFreeQueue
-{
+template<
+ typename ELEM_T,
+ typename Allocator = SHM_Allocator,
+ template<typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
+>
+class LockFreeQueue {
private:
- sem_t slots;
- sem_t items;
+ sem_t slots;
+ sem_t items;
-
public:
- sem_t mutex;
- LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
-
- /// @brief destructor of the class.
- /// Note it is not virtual since it is not expected to inherit from this
- /// template
- ~LockFreeQueue();
- std::atomic_uint reference;
- /// @brief constructor of the class
-
+ sem_t mutex;
- /// @brief returns the current number of items in the queue
- /// It tries to take a snapshot of the size of the queue, but in busy environments
- /// this function might return bogus values.
- ///
- /// If a reliable queue size must be kept you might want to have a look at
- /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
- /// it enables a reliable size though it hits overall performance of the queue
- /// (when the reliable size variable is on it's got an impact of about 20% in time)
- inline uint32_t size();
-
- /// @brief return true if the queue is full. False otherwise
- /// It tries to take a snapshot of the size of the queue, but in busy
- /// environments this function might return bogus values. See help in method
- /// LockFreeQueue::size
- inline bool full();
+ LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
- inline bool empty();
+ /// @brief destructor of the class.
+ /// Note it is not virtual since it is not expected to inherit from this
+ /// template
+ ~LockFreeQueue();
- inline ELEM_T& operator[](unsigned i);
-
- /// @brief push an element at the tail of the queue
- /// @param the element to insert in the queue
- /// Note that the element is not a pointer or a reference, so if you are using large data
- /// structures to be inserted in the queue you should think of instantiate the template
- /// of the queue as a pointer to that large structure
- /// @return true if the element was inserted in the queue. False if the queue was full
- int push(const ELEM_T &a_data);
- int push_nowait(const ELEM_T &a_data);
- int push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
-
- /// @brief pop the element at the head of the queue
- /// @param a reference where the element in the head of the queue will be saved to
- /// Note that the a_data parameter might contain rubbish if the function returns false
- /// @return true if the element was successfully extracted from the queue. False if the queue was empty
- int pop(ELEM_T &a_data);
- int pop_nowait(ELEM_T &a_data);
- int pop_timeout(ELEM_T &a_data, struct timespec * timeout);
+ std::atomic_uint reference;
+ /// @brief constructor of the class
- void *operator new(size_t size);
- void operator delete(void *p);
+ /// @brief returns the current number of items in the queue
+ /// It tries to take a snapshot of the size of the queue, but in busy environments
+ /// this function might return bogus values.
+ ///
+ /// If a reliable queue size must be kept you might want to have a look at
+ /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
+ /// it enables a reliable size though it hits overall performance of the queue
+ /// (when the reliable size variable is on it's got an impact of about 20% in time)
+ inline uint32_t size();
+
+ /// @brief return true if the queue is full. False otherwise
+ /// It tries to take a snapshot of the size of the queue, but in busy
+ /// environments this function might return bogus values. See help in method
+ /// LockFreeQueue::size
+ inline bool full();
+
+ inline bool empty();
+
+ inline ELEM_T &operator[](unsigned i);
+
+ /// @brief push an element at the tail of the queue
+ /// @param the element to insert in the queue
+ /// Note that the element is not a pointer or a reference, so if you are using large data
+ /// structures to be inserted in the queue you should think of instantiate the template
+ /// of the queue as a pointer to that large structure
+ /// @return true if the element was inserted in the queue. False if the queue was full
+ int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
+
+ /// @brief pop the element at the head of the queue
+ /// @param a reference where the element in the head of the queue will be saved to
+ /// Note that the a_data parameter might contain rubbish if the function returns false
+ /// @return true if the element was successfully extracted from the queue. False if the queue was empty
+ int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
+
+
+ void *operator new(size_t size);
+
+ void operator delete(void *p);
protected:
- /// @brief the actual queue. methods are forwarded into the real
- /// implementation
- Q_TYPE<ELEM_T, Allocator> m_qImpl;
+ /// @brief the actual queue. methods are forwarded into the real
+ /// implementation
+ Q_TYPE<ELEM_T, Allocator> m_qImpl;
private:
- /// @brief disable copy constructor declaring it private
- LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
+ /// @brief disable copy constructor declaring it private
+ LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
};
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
-{
-// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) {
+ // std::cout << "LockFreeQueue init reference=" << reference << std::endl;
if (sem_init(&slots, 1, qsize) == -1)
err_exit(errno, "LockFreeQueue sem_init");
if (sem_init(&items, 1, 0) == -1)
@@ -164,194 +160,130 @@
if (sem_init(&mutex, 1, 1) == -1)
err_exit(errno, "LockFreeQueue sem_init");
-
+
}
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
-{
+
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
// LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
- if(sem_destroy(&slots) == -1) {
+ if (sem_destroy(&slots) == -1) {
err_exit(errno, "LockFreeQueue sem_destroy");
}
- if(sem_destroy(&items) == -1) {
+ if (sem_destroy(&items) == -1) {
err_exit(errno, "LockFreeQueue sem_destroy");
}
- if(sem_destroy(&mutex) == -1) {
+ if (sem_destroy(&mutex) == -1) {
err_exit(errno, "LockFreeQueue sem_destroy");
}
}
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
-{
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() {
return m_qImpl.size();
-}
+}
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
-{
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() {
return m_qImpl.full();
}
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
-{
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() {
return m_qImpl.empty();
-}
-
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
-{
- LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
- if (psem_wait(&slots) == -1) {
- return -1;
- }
-
- if ( m_qImpl.push(a_data) ) {
- psem_post(&items);
-LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
- return 0;
- }
- return -1;
-
-}
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
-{
- if (psem_trywait(&slots) == -1) {
- return -1;
- }
-
- if ( m_qImpl.push(a_data)) {
- psem_post(&items);
- return 0;
- }
- return -1;
-
-}
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
-{
-LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");
- if ( psem_timedwait(&slots, ts) == -1) {
- return -1;
- }
-
- if (m_qImpl.push(a_data)){
- psem_post(&items);
-LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
- return 0;
- }
- return -1;
-
}
-
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
-{
-
- LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
- if (psem_wait(&items) == -1) {
- return -1;
+template<typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
+ LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
+ if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+ if (psem_trywait(&slots) == -1) {
+ return -1;
+ }
+ } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+ if (psem_timedwait(&slots, timeout) == -1) {
+ return -1;
+ }
+ } else {
+ if (psem_wait(&slots) == -1) {
+ return -1;
+ }
}
+
+
+ if (m_qImpl.push(a_data)) {
+ psem_post(&items);
+ LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
+ return 0;
+ }
+ return -1;
+
+}
+
+template<typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
+
+ LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
+
+
+ if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
+ if (psem_trywait(&items) == -1) {
+ return -1;
+ }
+ } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
+ if (psem_timedwait(&items, timeout) == -1) {
+ return -1;
+ }
+ } else {
+ if (psem_wait(&items) == -1) {
+ return -1;
+ }
+ }
+
if (m_qImpl.pop(a_data)) {
psem_post(&slots);
- LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
+ LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
return 0;
}
return -1;
}
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
-{
- if (psem_trywait(&items) == -1) {
- return -1;
- }
-
- if (m_qImpl.pop(a_data)) {
- psem_post(&slots);
- return 0;
- }
- return -1;
-}
-
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
-{
- if (psem_timedwait(&items, ts) == -1) {
- return -1;
- }
-
- if (m_qImpl.pop(a_data)) {
- psem_post(&slots);
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");
- return 0;
- }
- return -1;
-
-}
-
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
+template<typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+ELEM_T &LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
return m_qImpl.operator[](i);
}
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
-void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
- return Allocator::allocate(size);
+template<typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+void *LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size) {
+ return Allocator::allocate(size);
}
-template <
- typename ELEM_T,
- typename Allocator,
- template <typename T, typename AT> class Q_TYPE>
+template<typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
return Allocator::deallocate(p);
}
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 5d2d9b6..0be124b 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -12,6 +12,7 @@
#include "shm_allocator.h"
#include "usg_common.h"
#include "array_lock_free_sem_queue.h"
+#include "lock_free_queue.h"
#include "bus_error.h"
template <typename ELEM_T> class SHMQueue {
@@ -51,7 +52,7 @@
/// @brief the actual queue-> methods are forwarded into the real
/// implementation
- ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *queue;
+ LockFreeQueue<ELEM_T, SHM_Allocator> *queue;
private:
/// @brief disable copy constructor declaring it private
@@ -64,7 +65,7 @@
hashtable_t *hashtable = mm_get_hashtable();
std::set<int> *keyset = hashtable_keyset(hashtable);
std::set<int>::iterator keyItr;
- ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
+ LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
bool found;
size_t count = 0;
for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
@@ -77,7 +78,7 @@
}
if (!found) {
// 閿�姣佸叡浜唴瀛樼殑queue
- mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
+ mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
delete mqueue;
hashtable_remove(hashtable, *keyItr);
count++;
@@ -91,11 +92,11 @@
template <typename ELEM_T>
size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) {
hashtable_t *hashtable = mm_get_hashtable();
- ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
+ LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
size_t count = 0;
for(int i = 0; i< length; i++) {
// 閿�姣佸叡浜唴瀛樼殑queue
- mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
+ mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
delete mqueue;
hashtable_remove(hashtable, keys[i]);
count++;
@@ -113,9 +114,9 @@
SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) {
hashtable_t *hashtable = mm_get_hashtable();
- queue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
+ queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
if (queue == NULL || (void *)queue == (void *)1) {
- queue = new ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator>(qsize);
+ queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
hashtable_put(hashtable, key, (void *)queue);
}
// queue->reference++;
@@ -155,7 +156,7 @@
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
- int rv = queue->push(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
+ int rv = queue->push(a_data, NULL, BUS_NOWAIT_FLAG);
if(rv == -1) {
if (errno == EAGAIN)
return EAGAIN;
@@ -170,7 +171,7 @@
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) {
- int rv = queue->push(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
+ int rv = queue->push(a_data, timeout, BUS_TIMEOUT_FLAG);
if(rv == -1) {
if(errno == ETIMEDOUT)
return EBUS_TIMEOUT;
@@ -195,7 +196,7 @@
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
- int rv = queue->pop(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
+ int rv = queue->pop(a_data, NULL, BUS_NOWAIT_FLAG);
if(rv == -1) {
if (errno == EAGAIN)
@@ -213,7 +214,7 @@
inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) {
int rv;
- rv = queue->pop(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
+ rv = queue->pop(a_data, timeout, BUS_TIMEOUT_FLAG);
if(rv == -1) {
if (errno == ETIMEDOUT) {
return EBUS_TIMEOUT;
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 4340bc4..54effbe 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -51,7 +51,7 @@
}
// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
- return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
+ return shm_sendto(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG);
}
@@ -74,7 +74,7 @@
}
int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
- int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
+ int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG);
// logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
return rv;
}
@@ -92,7 +92,7 @@
return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
}
int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
- return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
+ return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG);
}
int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
@@ -103,7 +103,7 @@
return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
}
int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
- return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
+ return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG);
}
@@ -123,7 +123,7 @@
return _sub_(topic, size, key, timeout, 0);
}
int ShmModSocket::sub_nowait(char *topic, int size, int key) {
- return _sub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT);
+ return _sub_(topic, size, key, NULL, (int)BUS_NOWAIT_FLAG);
}
@@ -142,7 +142,7 @@
return _desub_(topic, size, key, timeout, 0);
}
int ShmModSocket::desub_nowait(char *topic, int size, int key) {
- return _desub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT);
+ return _desub_(topic, size, key, NULL, (int)BUS_NOWAIT_FLAG);
}
@@ -161,7 +161,7 @@
return _pub_( topic, topic_size, content, content_size, key, timeout, 0);
}
int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
- return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT);
+ return _pub_(topic, topic_size, content, content_size, key, NULL, (int)BUS_NOWAIT_FLAG);
}
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 76e906f..e370c72 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -373,9 +373,9 @@
memcpy(dest.buf, buf, size);
- if(flags & SHM_MSG_NOWAIT != 0) {
+ if( (flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
rv = remoteQueue->push_nowait(dest);
- } else if(timeout != NULL) {
+ } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
rv = remoteQueue->push_timeout(dest, timeout);
} else {
rv = remoteQueue->push(dest);
@@ -393,8 +393,6 @@
logger->error(rv, "sendto key %d failed", key);
}
return rv;
-
-
}
}
@@ -433,9 +431,9 @@
shm_msg_t src;
- if(flags & SHM_MSG_NOWAIT != 0) {
+ if((flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
rv = socket->queue->pop_nowait(src);
- } else if(timeout != NULL) {
+ } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
rv = socket->queue->pop_timeout(src, timeout);
// printf("0 shm_recvfrom====%d\n", rv);
} else {
@@ -649,7 +647,7 @@
switch (src.type) {
case SHM_SOCKET_OPEN:
- socket->acceptQueue->push_timeout(src, &timeout);
+ socket->acceptQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
break;
case SHM_SOCKET_CLOSE:
_server_close_conn_to_client(socket, src.key);
@@ -660,7 +658,7 @@
if (iter != socket->clientSocketMap->end()) {
client_socket = iter->second;
// print_msg("_server_run_msg_rev push before", src);
- client_socket->messageQueue->push_timeout(src, &timeout);
+ client_socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
// print_msg("_server_run_msg_rev push after", src);
}
@@ -695,7 +693,7 @@
_client_close_conn_to_server(socket);
break;
case SHM_COMMON_MSG:
- socket->messageQueue->push_timeout(src, &timeout);
+ socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
break;
default:
logger->error( "shm_socket._client_run_msg_rev: undefined message type.");
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index abc8e20..b1665a0 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -6,11 +6,7 @@
#include "shm_queue.h"
#include "lock_free_queue.h"
-enum shm_socket_flag_t
-{
- SHM_MSG_TIMEOUT = 1,
- SHM_MSG_NOWAIT = 2
-};
+
enum shm_connection_status_t {
SHM_CONN_CLOSED=1,
@@ -88,7 +84,7 @@
int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
/**
- * @flags : SHM_MSG_NOWAIT
+ * @flags : BUS_NOWAIT_FLAG
*/
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0);
diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh
index cd4c809..fc639ca 100755
--- a/test_net_socket/net_mod_socket.sh
+++ b/test_net_socket/net_mod_socket.sh
@@ -36,7 +36,7 @@
}
function close() {
- ps -ef | grep -e "test_net_mod_socket" -e "net_mod_socket"| awk '{print $2}' | xargs -i kill -9 {}
+ ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk '{print $2}' | xargs -i kill -9 {}
ipcrm -a
}
@@ -48,8 +48,6 @@
case ${1} in
"server")
- close
- sleep 2
server
;;
"client")
diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp
index 0b2afd6..2fcd604 100644
--- a/test_net_socket/test_net_mod_socket.cpp
+++ b/test_net_socket/test_net_mod_socket.cpp
@@ -76,10 +76,13 @@
void *recvbuf;
int size;
int key;
- while (net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) == 0) {
+ int rv;
+ while ((rv = net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) ) == 0) {
printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf);
free(recvbuf);
}
+
+ printf("print_sub_msg return . rv = %d\n", rv);
}
diff --git a/test_socket/CMakeLists.txt b/test_socket/CMakeLists.txt
new file mode 100644
index 0000000..a3f3fa5
--- /dev/null
+++ b/test_socket/CMakeLists.txt
@@ -0,0 +1,11 @@
+# add the executable
+add_executable(bus_test bus_test.cpp)
+target_link_libraries(bus_test PRIVATE shm_queue ${EXTRA_LIBS} )
+target_include_directories(bus_test PRIVATE
+ "${PROJECT_BINARY_DIR}"
+ ${EXTRA_INCLUDES}
+ )
+
+
+
+
diff --git a/test_socket/bus_test.cpp b/test_socket/bus_test.cpp
new file mode 100644
index 0000000..cdd6142
--- /dev/null
+++ b/test_socket/bus_test.cpp
@@ -0,0 +1,116 @@
+#include "bus_server_socket.h"
+#include "shm_mod_socket.h"
+#include "shm_mm_wrapper.h"
+#include "usg_common.h"
+#include "mm.h"
+
+BusServerSocket * server_socket;
+void sigint_handler(int sig) {
+
+ exit(0);
+}
+
+void server(int key) {
+ server_socket = new BusServerSocket();
+
+ server_socket->bind( key);
+
+ server_socket->start();
+}
+
+
+void *run_recv(void *skptr) {
+ pthread_detach(pthread_self());
+ void *recvbuf;
+ int size;
+ int key;
+ ShmModSocket *sk = (ShmModSocket *)skptr;
+ while (sk->recvfrom( &recvbuf, &size, &key) == 0) {
+ printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf);
+ free(recvbuf);
+ }
+
+}
+
+void client(int key) {
+ ShmModSocket *sk = new ShmModSocket();
+
+ pthread_t tid;
+ pthread_create(&tid, NULL, run_recv, (void *)socket);
+ int size;
+
+ char action[512];
+ char topic[512];
+ char content[512];
+ long i = 0;
+ while (true) {
+ //printf("Usage: pub <topic> [content] or sub <topic>\n");
+ printf("Can I help you? sub, pub, desub or quit\n");
+ scanf("%s",action);
+
+ if(strcmp(action, "sub") == 0) {
+ printf("Please input topic!\n");
+ scanf("%s", topic);
+ if (sk->sub(topic, strlen(topic), key) == 0) {
+ printf("%d Sub success!\n", sk->get_key());
+ } else {
+ printf("Sub failture!\n");
+ exit(0);
+ }
+
+ } else if(strcmp(action, "desub") == 0) {
+ printf("Please input topic!\n");
+ scanf("%s", topic);
+ if (sk->desub(topic, strlen(topic), key) == 0) {
+ printf("%d Desub success!\n", sk->get_key());
+ } else {
+ printf("Desub failture!\n");
+ exit(0);
+ }
+
+ } else if(strcmp(action, "pub") == 0) {
+ // printf("%s %s %s\n", action, topic, content);
+ printf("Please input topic and content\n");
+ scanf("%s %s", topic, content);
+ if(sk->pub(topic, strlen(topic)+1, content, strlen(content)+1, key) == 0){
+ printf("%d Pub success!\n", sk->get_key());
+ } else {
+ printf("Pub failture!\n");
+ }
+
+ } else if(strcmp(action, "quit") == 0) {
+ printf("(%d) quit\n", sk->get_key());
+ delete sk;
+ break;
+ } else {
+ printf("error input argument\n");
+ continue;
+ }
+
+ }
+
+}
+
+
+
+int main(int argc, char *argv[]) {
+ shm_mm_wrapper_init(512);
+ int key;
+ if (argc < 3) {
+ fprintf(stderr, "Usage: %s %s|%s <key> ...\n", argv[0], "server", "client");
+ return 1;
+ }
+
+ key = atoi(argv[2]);
+
+ if (strcmp("server", argv[1]) == 0) {
+ server(key);
+
+ } else if (strcmp("client", argv[1]) == 0) {
+ client(key);
+ }
+
+
+
+ return 0;
+}
\ No newline at end of file
diff --git a/test_socket/dgram_mod_bus.cpp b/test_socket/dgram_mod_bus.cpp
deleted file mode 100644
index 042b2fd..0000000
--- a/test_socket/dgram_mod_bus.cpp
+++ /dev/null
@@ -1,131 +0,0 @@
-#include "dgram_mod_socket.h"
-#include "shm_mm_wraper.h"
-#include "usg_common.h"
-#include "mm.h"
-
-void * server_socket;
-void sigint_handler(int sig) {
- dgram_mod_close_socket(server_socket);
- exit(0);
-}
-
-void server(int key, bool restart) {
- server_socket = dgram_mod_open_socket();
-
-
- if(restart) {
- dgram_mod_force_bind(server_socket, key);
- } else {
- dgram_mod_bind(server_socket, key);
- }
-
-
- dgram_mod_start_bus(server_socket);
-}
-
-
-void *run_recv(void *socket) {
- pthread_detach(pthread_self());
- void *recvbuf;
- int size;
- int key;
- while (dgram_mod_recvfrom( socket, &recvbuf, &size, &key) == 0) {
- printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf);
- free(recvbuf);
- }
-
-}
-
-void client(int key) {
- void *socket = dgram_mod_open_socket();
-
- pthread_t tid;
- pthread_create(&tid, NULL, run_recv, socket);
- int size;
-
- char action[512];
- char topic[512];
- char content[512];
- long i = 0;
- while (true) {
- //printf("Usage: pub <topic> [content] or sub <topic>\n");
- printf("Can I help you? sub, pub, desub or quit\n");
- scanf("%s",action);
-
- if(strcmp(action, "sub") == 0) {
- printf("Please input topic!\n");
- scanf("%s", topic);
- if (dgram_mod_sub(socket, topic, strlen(topic), key) == 0) {
- printf("%d Sub success!\n", dgram_mod_get_port(socket));
- } else {
- printf("Sub failture!\n");
- exit(0);
- }
-
- } else if(strcmp(action, "desub") == 0) {
- printf("Please input topic!\n");
- scanf("%s", topic);
- if (dgram_mod_desub(socket, topic, strlen(topic), key) == 0) {
- printf("%d Desub success!\n", dgram_mod_get_port(socket));
- } else {
- printf("Desub failture!\n");
- exit(0);
- }
-
- } else if(strcmp(action, "pub") == 0) {
- // printf("%s %s %s\n", action, topic, content);
- printf("Please input topic and content\n");
- scanf("%s %s", topic, content);
- if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, key) == 0){
- printf("%d Pub success!\n", dgram_mod_get_port(socket));
- } else {
- printf("Pub failture!\n");
- }
-
- } else if(strcmp(action, "quit") == 0) {
- printf("(%d) quit\n", dgram_mod_get_port(socket));
- dgram_mod_close_socket(socket);
- break;
- } else {
- printf("error input argument\n");
- continue;
- }
-
- }
-
-}
-
-
-
-int main(int argc, char *argv[]) {
- shm_mm_wrapper_init(512);
- int key;
- if (argc < 3) {
- fprintf(stderr, "Usage: %s %s|%s|rmkey <key> ...\n", argv[0], "server", "client");
- return 1;
- }
-
- key = atoi(argv[2]);
-
- if (strcmp("server", argv[1]) == 0) {
- if(argc >= 4 && strcmp("restart", argv[3]) == 0) {
- server(key, true);
- }
- else{
- server(key, false);
- }
-
- } else if (strcmp("client", argv[1]) == 0) {
- client(key);
- } else if(strcmp("rmkey", argv[1]) == 0) {
- for(int i = 2; i < argc; i++) {
- key = atoi(argv[i]);
- dgram_mod_remove_key(key);
- // printf("%d\n", key);
- }
- }
-
-
-
- return 0;
-}
\ No newline at end of file
--
Gitblit v1.8.0