From 7285f5762bd3f1be94884730d9a28dd406f98fc5 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期三, 06 一月 2021 18:12:05 +0800 Subject: [PATCH] update --- .gitignore | 1 src/queue/lock_free_queue.h | 112 ++++++++++++++-------- test/test_sem.c | 78 +++++++++++++++ build.sh | 12 +- test_net_socket/Makefile | 1 test_net_socket/heart_beat.sh | 10 + Make.defines.linux | 2 /dev/null | 0 test/Makefile | 20 +-- src/queue/shm_queue.h | 6 src/px_sem_util.cpp | 13 ++ src/px_sem_util.h | 13 ++ src/logger_factory.h | 2 test_net_socket/heart_beat.cpp | 28 +++-- 14 files changed, 220 insertions(+), 78 deletions(-) diff --git a/.gitignore b/.gitignore index 1f1e670..f4062b1 100644 --- a/.gitignore +++ b/.gitignore @@ -42,5 +42,6 @@ .idea/ build/ +dest/ *.tmp core diff --git a/Make.defines.linux b/Make.defines.linux index 2a1fddc..14195d8 100755 --- a/Make.defines.linux +++ b/Make.defines.linux @@ -36,7 +36,7 @@ # preprocessor options CPPFLAGS += $(INCLUDES) -std=c++11 # compilation options -CFLAGS += $(DEBUGFLAGS) -Wall -DLINUX -D_GNU_SOURCE $(EXTRA) +CFLAGS += $(DEBUGFLAGS) -Wall -DLINUX -D_GNU_SOURCE -D_POSIX_C_SOURCE=200112L $(EXTRA) # linked options LDFLAGS += diff --git a/build.sh b/build.sh index 14c25ba..4608b74 100755 --- a/build.sh +++ b/build.sh @@ -3,12 +3,14 @@ [ -d build ] || mkdir build rm -rf build/* cd build -# cmake ../Step2 -# cmake ../Step4 -DSUPPORT_RDMA=OFF - -cmake -DCMAKE_INSTALL_PREFIX=/home/wzq/tmp2 -DSUPPORT_RDMA=OFF .. + +# -DCMAKE_BUILD_TYPE=Debug | Release +# -DBUILD_SHARED_LIBS=ON +# -DCMAKE_INSTALL_PREFIX=$(pwd/../dest) +# -DQCA_MAN_INSTALL_DIR:PATH=/usr/share/man +cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=Debug -DSUPPORT_RDMA=OFF .. cmake --build . cmake --build . --target install - \ No newline at end of file + diff --git a/src/logger_factory.h b/src/logger_factory.h index 2fb648a..6bbaef0 100644 --- a/src/logger_factory.h +++ b/src/logger_factory.h @@ -14,7 +14,7 @@ return logger; LoggerConfig config; - config.level = Logger::INFO; + config.level = Logger::DEBUG; config.logFile = "softbus.log"; config.console = 1; logger = new Logger(config); diff --git a/src/px_sem_util.cpp b/src/px_sem_util.cpp new file mode 100644 index 0000000..80008fb --- /dev/null +++ b/src/px_sem_util.cpp @@ -0,0 +1,13 @@ +#include "px_sem_util.h" + +struct timespec PXSemUtil::calc_sem_timeout(const struct timespec *ts) { + int tmp_sec; + struct timespec timeout; + if (clock_gettime(CLOCK_REALTIME, &timeout) == -1) + err_exit(errno, "clock_gettime"); + timeout.tv_nsec += ts->tv_nsec; + tmp_sec = timeout.tv_nsec / 10e9; + timeout.tv_nsec = timeout.tv_nsec - tmp_sec * 10e9; + timeout.tv_sec += ts->tv_sec + tmp_sec; + return timeout; +} \ No newline at end of file diff --git a/src/px_sem_util.h b/src/px_sem_util.h new file mode 100644 index 0000000..60ec978 --- /dev/null +++ b/src/px_sem_util.h @@ -0,0 +1,13 @@ +#ifndef PX_SEM_UTIL_H +#define PX_SEM_UTIL_H + +#include "usg_common.h" +#include "usg_typedef.h" + +namespace PXSemUtil { + + struct timespec calc_sem_timeout(const struct timespec *ts); + +} + +#endif diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h index 84c885c..6a610dc 100644 --- a/src/queue/lock_free_queue.h +++ b/src/queue/lock_free_queue.h @@ -7,6 +7,7 @@ #include "sem_util.h" #include "logger_factory.h" #include "shm_allocator.h" +#include "px_sem_util.h" // default Queue size #define LOCK_FREE_Q_DEFAULT_SIZE 16 @@ -75,11 +76,13 @@ { private: - int slots; - int items; + sem_t slots; + sem_t items; + + public: - int mutex; + sem_t mutex; LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); /// @brief destructor of the class. @@ -150,9 +153,14 @@ LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) { // std::cout << "LockFreeQueue init reference=" << reference << std::endl; - slots = SemUtil::get(IPC_PRIVATE, qsize); - items = SemUtil::get(IPC_PRIVATE, 0); - mutex = SemUtil::get(IPC_PRIVATE, 1); + if (sem_init(&slots, 1, qsize) == -1) + err_exit(errno, "LockFreeQueue sem_init"); + if (sem_init(&items, 1, 0) == -1) + err_exit(errno, "LockFreeQueue sem_init"); + if (sem_init(&mutex, 1, 1) == -1) + err_exit(errno, "LockFreeQueue sem_init"); + + } template < @@ -162,9 +170,15 @@ LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() { // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy"); - SemUtil::remove(slots); - SemUtil::remove(items); - SemUtil::remove(mutex); + if(sem_destroy(&slots) == -1) { + err_exit(errno, "LockFreeQueue sem_destroy"); + } + if(sem_destroy(&items) == -1) { + err_exit(errno, "LockFreeQueue sem_destroy"); + } + if(sem_destroy(&mutex) == -1) { + err_exit(errno, "LockFreeQueue sem_destroy"); + } } template < @@ -201,16 +215,15 @@ template <typename T, typename AT> class Q_TYPE> bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) { -LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); - if (SemUtil::dec(slots) == -1) { +// LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); + if (sem_wait(&slots) == -1) { err_msg(errno, "LockFreeQueue push"); return false; } - + if ( m_qImpl.push(a_data) ) { - - SemUtil::inc(items); -LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); + sem_post(&items); +// LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); return true; } return false; @@ -223,7 +236,7 @@ template <typename T, typename AT> class Q_TYPE> bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data) { - if (SemUtil::dec_nowait(slots) == -1) { + if (sem_trywait(&slots) == -1) { if (errno == EAGAIN) return false; else { @@ -234,7 +247,7 @@ } if ( m_qImpl.push(a_data)) { - SemUtil::inc(items); + sem_post(&items); return true; } return false; @@ -245,22 +258,38 @@ typename ELEM_T, typename Allocator, template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout) +bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts) { + // int tmp_sec; + // struct timespec timeout; + // if (clock_gettime(CLOCK_REALTIME, &timeout) == -1) + // err_exit(errno, "clock_gettime"); + // timeout.tv_nsec += ts->tv_nsec; + // tmp_sec = timeout.tv_nsec / 10e9; + // timeout.tv_nsec = timeout.tv_nsec - tmp_sec * 10e9; + // timeout.tv_sec += ts->tv_sec + tmp_sec; -LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n"); - if (SemUtil::dec_timeout(slots, timeout) == -1) { - if (errno == EAGAIN) + struct timespec timeout = PXSemUtil::calc_sem_timeout(ts); + // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld", + // timeout.tv_sec, timeout.tv_nsec); + + while (sem_timedwait(&slots, &timeout) == -1) { + // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n", + // timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno); + + if(errno == ETIMEDOUT) return false; + else if(errno == EINTR) + continue; else { - err_msg(errno, "LockFreeQueue push_timeout"); + LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); return false; } } if (m_qImpl.push(a_data)){ - SemUtil::inc(items); -LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n"); + sem_post(&items); +// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n"); return true; } return false; @@ -277,19 +306,18 @@ bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) { -LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); - if (SemUtil::dec(items) == -1) { - err_msg(errno, "LockFreeQueue pop"); +// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); + if (sem_wait(&items) == -1) { + LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop"); return false; } if (m_qImpl.pop(a_data)) { - SemUtil::inc(slots); -LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); + sem_post(&slots); +// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); return true; } return false; - } template < @@ -298,21 +326,20 @@ template <typename T, typename AT> class Q_TYPE> bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data) { - if (SemUtil::dec_nowait(items) == -1) { + if (sem_trywait(&items) == -1) { if (errno == EAGAIN) return false; else { - err_msg(errno, "LockFreeQueue pop_nowait"); + LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait"); return false; } } if (m_qImpl.pop(a_data)) { - SemUtil::inc(slots); + sem_post(&slots); return true; } return false; - } @@ -320,21 +347,26 @@ typename ELEM_T, typename Allocator, template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) +bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts) { -LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n"); - if (SemUtil::dec_timeout(items, timeout) == -1) { +// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n"); + + struct timespec timeout = PXSemUtil::calc_sem_timeout(ts); + + while (sem_timedwait(&items, &timeout) == -1) { if (errno == EAGAIN) return false; + else if(errno == EINTR) + continue; else { - // err_msg(errno, "LockFreeQueue pop_timeout"); + LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout"); return false; } } if (m_qImpl.pop(a_data)) { - SemUtil::inc(slots); -LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n"); + sem_post(&slots); +// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n"); return true; } return false; diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h index 7c7b89b..8a23da1 100644 --- a/src/queue/shm_queue.h +++ b/src/queue/shm_queue.h @@ -124,7 +124,7 @@ return; } - SemUtil::dec(queue->mutex); + sem_wait(&(queue->mutex)); queue->reference--; // LoggerFactory::getLogger()->debug("SHMQueue destructor reference===%d", if (queue->reference.load() == 0) { @@ -132,10 +132,10 @@ queue = NULL; hashtable_t *hashtable = mm_get_hashtable(); hashtable_remove(hashtable, KEY); - // 姝ゆ椂queue宸茬粡閿�姣侊紝鏃犻渶 SemUtil::inc(queue->mutex) + // 姝ゆ椂queue宸茬粡閿�姣侊紝鏃犻渶 sem_post(&(queue->mutex)) // printf("SHMQueue destructor delete queue\n"); } else { - SemUtil::inc(queue->mutex); + sem_post(&(queue->mutex)); } } diff --git a/test/Makefile b/test/Makefile index f719401..c490f5e 100755 --- a/test/Makefile +++ b/test/Makefile @@ -3,27 +3,25 @@ PLATFORM=$(shell $(ROOT)/systype.sh) include $(ROOT)/Make.defines.$(PLATFORM) -#RPATH += -Wl,-rpath=${ROOT}/lib +RPATH += -Wl,-rpath=$(ROOT)/lib:$(DEST)/lib # 寮�婧愬伐鍏峰寘璺緞 -LDDIR += -L${DEST}/lib +LDDIR += -L$(DEST)/lib +#-lusgcommon # 寮�婧愬伐鍏峰寘 -LDLIBS += -lshm_queue -lusgcommon -lpthread +LDLIBS += -lshm_queue -lpthread -lusgcommon -INCLUDES += -I${DEST}/include/shmqueue -I$(ROOT)/include/usgcommon +#-I$(ROOT)/include/usgcommon +INCLUDES += -I${ROOT}/src -I${ROOT}/src/shm -I${ROOT}/src/queue -I${ROOT}/src/socket -I${ROOT}/include/usgcommon - -PROGS = ${DEST}/test +PROGS = ${DEST}/test_sem DEPENDENCES = $(patsubst %, %.d, $(PROGS)) #LIBCOMMON=${ROOT}/lib/libusgcommon.a -build: $(PROGS) - - -# class -#$(DEST)/kucker : kucker.c +build: $(PROGS) + clean: diff --git a/test/test2 b/test/test2 deleted file mode 100755 index 0711a8d..0000000 --- a/test/test2 +++ /dev/null Binary files differ diff --git a/test/test_sem.c b/test/test_sem.c new file mode 100644 index 0000000..ce2a9d0 --- /dev/null +++ b/test/test_sem.c @@ -0,0 +1,78 @@ +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> +#include <semaphore.h> +#include <time.h> +#include <assert.h> +#include <errno.h> +#include <signal.h> + +sem_t sem; + +#define handle_error(msg) \ + do { perror(msg); exit(EXIT_FAILURE); } while (0) + +static void +handler(int sig) +{ + write(STDOUT_FILENO, "sem_post() from handler\n", 24); + if (sem_post(&sem) == -1) + { + write(STDERR_FILENO, "sem_post() failed\n", 18); + _exit(EXIT_FAILURE); + } +} + +int +main(int argc, char *argv[]) +{ + struct sigaction sa; + struct timespec ts; + int s; + + if (argc != 3) + { + fprintf(stderr, "Usage: %s <alarm-secs> <wait-secs>\n", + argv[0]); + exit(EXIT_FAILURE); + } + + if (sem_init(&sem, 0, 0) == -1) + handle_error("sem_init"); + + /* Establish SIGALRM handler; set alarm timer using argv[1] */ + + sa.sa_handler = handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + if (sigaction(SIGALRM, &sa, NULL) == -1) + handle_error("sigaction"); + + alarm(atoi(argv[1])); + + /* Calculate relative interval as current time plus + number of seconds given argv[2] */ + + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) + handle_error("clock_gettime"); + + ts.tv_sec += atoi(argv[2]); + + printf("main() about to call sem_timedwait()\n"); + while ((s = sem_timedwait(&sem, &ts)) == -1 && errno == EINTR) + continue; /* Restart if interrupted by handler */ + + /* Check what happened */ + + if (s == -1) + { + if (errno == ETIMEDOUT) + printf("sem_timedwait() timed out\n"); + else + perror("sem_timedwait"); + } + else + printf("sem_timedwait() succeeded\n"); + + exit((s == 0) ? EXIT_SUCCESS : EXIT_FAILURE); +} diff --git a/test_net_socket/Makefile b/test_net_socket/Makefile index adbcef3..94734be 100644 --- a/test_net_socket/Makefile +++ b/test_net_socket/Makefile @@ -22,6 +22,7 @@ build: $(PROGS) cp -a net_mod_socket.sh ${DEST} + cp -a heart_beat.sh ${DEST} clean: diff --git a/test_net_socket/heart_beat.cpp b/test_net_socket/heart_beat.cpp index d7f5ed3..939afc6 100644 --- a/test_net_socket/heart_beat.cpp +++ b/test_net_socket/heart_beat.cpp @@ -50,8 +50,10 @@ net_mod_recv_msg_t *recv_arr; while (true) { sprintf(sendbuf, "%d", i); - printf("SEND HEART:%s\n", sendbuf); - rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 1000); + rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000); + //rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL); + printf("SEND HEART:%s, suc nodes = %d\n", sendbuf, rv); + // sleep(1); i++; } @@ -63,10 +65,10 @@ signal(SIGINT, sigint_handler); Targ *targ = (Targ *)arg; int port = targ->port; - void *socket = net_mod_socket_open(); + void *client = net_mod_socket_open(); int size; char sendbuf[512]; - long scale = 10; + long scale = 100000; long i = 0; net_node_t node_arr[] = {"", 0, 100}; int node_arr_size = 1; @@ -77,19 +79,19 @@ while (i < scale) { sprintf(sendbuf, "%d", i); printf("%d SEND HEART:%s\n", targ->id, sendbuf); - net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); - sleep(1); + net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL); + // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); i++; } - net_mod_socket_close(socket); + net_mod_socket_close(client); return (void *)i; } -void startClients(int port) { +void mclient(int port) { - int status, i = 0, processors = 100; + int status, i = 0, processors = 4; void *res[processors]; Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); pthread_t tids[processors]; @@ -130,12 +132,12 @@ port = atoi(argv[2]); - if (strcmp("server", argv[1]) == 0) { + if (strcmp("server", argv[1]) == 0) server(port); - } - - if (strcmp("client", argv[1]) == 0) + else if (strcmp("client", argv[1]) == 0) client(port); + else if (strcmp("mclient", argv[1]) == 0) + mclient(port); shm_mm_wrapper_destroy(); return 0; diff --git a/test_socket/test_survey.sh b/test_net_socket/heart_beat.sh similarity index 72% rename from test_socket/test_survey.sh rename to test_net_socket/heart_beat.sh index 9b167c7..2c32237 100755 --- a/test_socket/test_survey.sh +++ b/test_net_socket/heart_beat.sh @@ -1,12 +1,14 @@ +#! /bin/bash + PROCESSES=100 function clean() { - ps -ef | grep "dgram_mod_survey" | awk '{print $2}' | xargs -i kill -9 {} + ps -ef | grep "heart_beat" | awk '{print $2}' | xargs -i kill -9 {} ipcrm -a } function start_server() { clean - ./dgram_mod_survey server 8 & server_pid=$! + ./heart_beat server 8 & server_pid=$! echo "start server pid ${server_pid}" } @@ -14,7 +16,7 @@ for (( i=0; i<$PROCESSES; i++ )) do # pid_arr[$i]=$i - ./dgram_mod_survey client 8 & pid_arr[$i]=$! + ./heart_beat client 8 & pid_arr[$i]=$! echo "start ${pid_arr[$i]}" done } @@ -24,7 +26,7 @@ do echo "kill ${pid_arr[$i]}" kill -9 ${pid_arr[$i]} - #./dgram_mod_survey client 8 & ${pid_arr[$i]}=$! + #./heart_beat client 8 & ${pid_arr[$i]}=$! done } diff --git a/test_queue/multiple_queue_consumer b/test_queue/multiple_queue_consumer deleted file mode 100755 index aa870b4..0000000 --- a/test_queue/multiple_queue_consumer +++ /dev/null Binary files differ diff --git a/test_queue/multiple_queue_productor b/test_queue/multiple_queue_productor deleted file mode 100755 index 9d4855c..0000000 --- a/test_queue/multiple_queue_productor +++ /dev/null Binary files differ diff --git a/test_queue/single_consumer b/test_queue/single_consumer deleted file mode 100755 index d99582b..0000000 --- a/test_queue/single_consumer +++ /dev/null Binary files differ diff --git a/test_queue/single_productor b/test_queue/single_productor deleted file mode 100755 index fb7dc24..0000000 --- a/test_queue/single_productor +++ /dev/null Binary files differ diff --git a/test_queue/test b/test_queue/test deleted file mode 100755 index bdad69e..0000000 --- a/test_queue/test +++ /dev/null Binary files differ diff --git a/test_queue/test_lockfree_queue b/test_queue/test_lockfree_queue deleted file mode 100755 index 1e4e62f..0000000 --- a/test_queue/test_lockfree_queue +++ /dev/null Binary files differ diff --git a/test_queue/test_lostdata b/test_queue/test_lostdata deleted file mode 100755 index c16a113..0000000 --- a/test_queue/test_lostdata +++ /dev/null Binary files differ diff --git a/test_queue/test_queue b/test_queue/test_queue deleted file mode 100755 index e80c123..0000000 --- a/test_queue/test_queue +++ /dev/null Binary files differ diff --git a/test_queue/test_timeout b/test_queue/test_timeout deleted file mode 100755 index d354f12..0000000 --- a/test_queue/test_timeout +++ /dev/null Binary files differ diff --git a/test_socket/client b/test_socket/client deleted file mode 100755 index 0b861e8..0000000 --- a/test_socket/client +++ /dev/null Binary files differ diff --git a/test_socket/dgram_socket_test b/test_socket/dgram_socket_test deleted file mode 100755 index f61bf91..0000000 --- a/test_socket/dgram_socket_test +++ /dev/null Binary files differ diff --git a/test_socket/server b/test_socket/server deleted file mode 100755 index 80fe19d..0000000 --- a/test_socket/server +++ /dev/null Binary files differ -- Gitblit v1.8.0