.gitignore
@@ -42,5 +42,6 @@ .idea/ build/ dest/ *.tmp core Make.defines.linux
@@ -36,7 +36,7 @@ # preprocessor options CPPFLAGS += $(INCLUDES) -std=c++11 # compilation options CFLAGS += $(DEBUGFLAGS) -Wall -DLINUX -D_GNU_SOURCE $(EXTRA) CFLAGS += $(DEBUGFLAGS) -Wall -DLINUX -D_GNU_SOURCE -D_POSIX_C_SOURCE=200112L $(EXTRA) # linked options LDFLAGS += build.sh
@@ -3,12 +3,14 @@ [ -d build ] || mkdir build rm -rf build/* cd build # cmake ../Step2 # cmake ../Step4 -DSUPPORT_RDMA=OFF cmake -DCMAKE_INSTALL_PREFIX=/home/wzq/tmp2 -DSUPPORT_RDMA=OFF .. # -DCMAKE_BUILD_TYPE=Debug | Release # -DBUILD_SHARED_LIBS=ON # -DCMAKE_INSTALL_PREFIX=$(pwd/../dest) # -DQCA_MAN_INSTALL_DIR:PATH=/usr/share/man cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=Debug -DSUPPORT_RDMA=OFF .. cmake --build . cmake --build . --target install src/logger_factory.h
@@ -14,7 +14,7 @@ return logger; LoggerConfig config; config.level = Logger::INFO; config.level = Logger::DEBUG; config.logFile = "softbus.log"; config.console = 1; logger = new Logger(config); src/px_sem_util.cpp
New file @@ -0,0 +1,13 @@ #include "px_sem_util.h" struct timespec PXSemUtil::calc_sem_timeout(const struct timespec *ts) { int tmp_sec; struct timespec timeout; if (clock_gettime(CLOCK_REALTIME, &timeout) == -1) err_exit(errno, "clock_gettime"); timeout.tv_nsec += ts->tv_nsec; tmp_sec = timeout.tv_nsec / 10e9; timeout.tv_nsec = timeout.tv_nsec - tmp_sec * 10e9; timeout.tv_sec += ts->tv_sec + tmp_sec; return timeout; } src/px_sem_util.h
New file @@ -0,0 +1,13 @@ #ifndef PX_SEM_UTIL_H #define PX_SEM_UTIL_H #include "usg_common.h" #include "usg_typedef.h" namespace PXSemUtil { struct timespec calc_sem_timeout(const struct timespec *ts); } #endif src/queue/lock_free_queue.h
@@ -7,6 +7,7 @@ #include "sem_util.h" #include "logger_factory.h" #include "shm_allocator.h" #include "px_sem_util.h" // default Queue size #define LOCK_FREE_Q_DEFAULT_SIZE 16 @@ -75,11 +76,13 @@ { private: int slots; int items; sem_t slots; sem_t items; public: int mutex; sem_t mutex; LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); /// @brief destructor of the class. @@ -150,9 +153,14 @@ LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) { // std::cout << "LockFreeQueue init reference=" << reference << std::endl; slots = SemUtil::get(IPC_PRIVATE, qsize); items = SemUtil::get(IPC_PRIVATE, 0); mutex = SemUtil::get(IPC_PRIVATE, 1); if (sem_init(&slots, 1, qsize) == -1) err_exit(errno, "LockFreeQueue sem_init"); if (sem_init(&items, 1, 0) == -1) err_exit(errno, "LockFreeQueue sem_init"); if (sem_init(&mutex, 1, 1) == -1) err_exit(errno, "LockFreeQueue sem_init"); } template < @@ -162,9 +170,15 @@ LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() { // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy"); SemUtil::remove(slots); SemUtil::remove(items); SemUtil::remove(mutex); if(sem_destroy(&slots) == -1) { err_exit(errno, "LockFreeQueue sem_destroy"); } if(sem_destroy(&items) == -1) { err_exit(errno, "LockFreeQueue sem_destroy"); } if(sem_destroy(&mutex) == -1) { err_exit(errno, "LockFreeQueue sem_destroy"); } } template < @@ -201,16 +215,15 @@ template <typename T, typename AT> class Q_TYPE> bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) { LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); if (SemUtil::dec(slots) == -1) { // LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); if (sem_wait(&slots) == -1) { err_msg(errno, "LockFreeQueue push"); return false; } if ( m_qImpl.push(a_data) ) { SemUtil::inc(items); LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); sem_post(&items); // LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); return true; } return false; @@ -223,7 +236,7 @@ template <typename T, typename AT> class Q_TYPE> bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data) { if (SemUtil::dec_nowait(slots) == -1) { if (sem_trywait(&slots) == -1) { if (errno == EAGAIN) return false; else { @@ -234,7 +247,7 @@ } if ( m_qImpl.push(a_data)) { SemUtil::inc(items); sem_post(&items); return true; } return false; @@ -245,22 +258,38 @@ typename ELEM_T, typename Allocator, template <typename T, typename AT> class Q_TYPE> bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout) bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts) { // int tmp_sec; // struct timespec timeout; // if (clock_gettime(CLOCK_REALTIME, &timeout) == -1) // err_exit(errno, "clock_gettime"); // timeout.tv_nsec += ts->tv_nsec; // tmp_sec = timeout.tv_nsec / 10e9; // timeout.tv_nsec = timeout.tv_nsec - tmp_sec * 10e9; // timeout.tv_sec += ts->tv_sec + tmp_sec; LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n"); if (SemUtil::dec_timeout(slots, timeout) == -1) { if (errno == EAGAIN) struct timespec timeout = PXSemUtil::calc_sem_timeout(ts); // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld", // timeout.tv_sec, timeout.tv_nsec); while (sem_timedwait(&slots, &timeout) == -1) { // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n", // timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno); if(errno == ETIMEDOUT) return false; else if(errno == EINTR) continue; else { err_msg(errno, "LockFreeQueue push_timeout"); LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); return false; } } if (m_qImpl.push(a_data)){ SemUtil::inc(items); LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n"); sem_post(&items); // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n"); return true; } return false; @@ -277,19 +306,18 @@ bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) { LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); if (SemUtil::dec(items) == -1) { err_msg(errno, "LockFreeQueue pop"); // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); if (sem_wait(&items) == -1) { LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop"); return false; } if (m_qImpl.pop(a_data)) { SemUtil::inc(slots); LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); sem_post(&slots); // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); return true; } return false; } template < @@ -298,21 +326,20 @@ template <typename T, typename AT> class Q_TYPE> bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data) { if (SemUtil::dec_nowait(items) == -1) { if (sem_trywait(&items) == -1) { if (errno == EAGAIN) return false; else { err_msg(errno, "LockFreeQueue pop_nowait"); LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait"); return false; } } if (m_qImpl.pop(a_data)) { SemUtil::inc(slots); sem_post(&slots); return true; } return false; } @@ -320,21 +347,26 @@ typename ELEM_T, typename Allocator, template <typename T, typename AT> class Q_TYPE> bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts) { LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n"); if (SemUtil::dec_timeout(items, timeout) == -1) { // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n"); struct timespec timeout = PXSemUtil::calc_sem_timeout(ts); while (sem_timedwait(&items, &timeout) == -1) { if (errno == EAGAIN) return false; else if(errno == EINTR) continue; else { // err_msg(errno, "LockFreeQueue pop_timeout"); LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout"); return false; } } if (m_qImpl.pop(a_data)) { SemUtil::inc(slots); LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n"); sem_post(&slots); // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n"); return true; } return false; src/queue/shm_queue.h
@@ -124,7 +124,7 @@ return; } SemUtil::dec(queue->mutex); sem_wait(&(queue->mutex)); queue->reference--; // LoggerFactory::getLogger()->debug("SHMQueue destructor reference===%d", if (queue->reference.load() == 0) { @@ -132,10 +132,10 @@ queue = NULL; hashtable_t *hashtable = mm_get_hashtable(); hashtable_remove(hashtable, KEY); // 此时queue已经销毁,无需 SemUtil::inc(queue->mutex) // 此时queue已经销毁,无需 sem_post(&(queue->mutex)) // printf("SHMQueue destructor delete queue\n"); } else { SemUtil::inc(queue->mutex); sem_post(&(queue->mutex)); } } test/Makefile
@@ -3,27 +3,25 @@ PLATFORM=$(shell $(ROOT)/systype.sh) include $(ROOT)/Make.defines.$(PLATFORM) #RPATH += -Wl,-rpath=${ROOT}/lib RPATH += -Wl,-rpath=$(ROOT)/lib:$(DEST)/lib # 开源工具包路径 LDDIR += -L${DEST}/lib LDDIR += -L$(DEST)/lib #-lusgcommon # 开源工具包 LDLIBS += -lshm_queue -lusgcommon -lpthread LDLIBS += -lshm_queue -lpthread -lusgcommon INCLUDES += -I${DEST}/include/shmqueue -I$(ROOT)/include/usgcommon #-I$(ROOT)/include/usgcommon INCLUDES += -I${ROOT}/src -I${ROOT}/src/shm -I${ROOT}/src/queue -I${ROOT}/src/socket -I${ROOT}/include/usgcommon PROGS = ${DEST}/test PROGS = ${DEST}/test_sem DEPENDENCES = $(patsubst %, %.d, $(PROGS)) #LIBCOMMON=${ROOT}/lib/libusgcommon.a build: $(PROGS) # class #$(DEST)/kucker : kucker.c build: $(PROGS) clean: test/test2Binary files differ
test/test_sem.c
New file @@ -0,0 +1,78 @@ #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <semaphore.h> #include <time.h> #include <assert.h> #include <errno.h> #include <signal.h> sem_t sem; #define handle_error(msg) \ do { perror(msg); exit(EXIT_FAILURE); } while (0) static void handler(int sig) { write(STDOUT_FILENO, "sem_post() from handler\n", 24); if (sem_post(&sem) == -1) { write(STDERR_FILENO, "sem_post() failed\n", 18); _exit(EXIT_FAILURE); } } int main(int argc, char *argv[]) { struct sigaction sa; struct timespec ts; int s; if (argc != 3) { fprintf(stderr, "Usage: %s <alarm-secs> <wait-secs>\n", argv[0]); exit(EXIT_FAILURE); } if (sem_init(&sem, 0, 0) == -1) handle_error("sem_init"); /* Establish SIGALRM handler; set alarm timer using argv[1] */ sa.sa_handler = handler; sigemptyset(&sa.sa_mask); sa.sa_flags = 0; if (sigaction(SIGALRM, &sa, NULL) == -1) handle_error("sigaction"); alarm(atoi(argv[1])); /* Calculate relative interval as current time plus number of seconds given argv[2] */ if (clock_gettime(CLOCK_REALTIME, &ts) == -1) handle_error("clock_gettime"); ts.tv_sec += atoi(argv[2]); printf("main() about to call sem_timedwait()\n"); while ((s = sem_timedwait(&sem, &ts)) == -1 && errno == EINTR) continue; /* Restart if interrupted by handler */ /* Check what happened */ if (s == -1) { if (errno == ETIMEDOUT) printf("sem_timedwait() timed out\n"); else perror("sem_timedwait"); } else printf("sem_timedwait() succeeded\n"); exit((s == 0) ? EXIT_SUCCESS : EXIT_FAILURE); } test_net_socket/Makefile
@@ -22,6 +22,7 @@ build: $(PROGS) cp -a net_mod_socket.sh ${DEST} cp -a heart_beat.sh ${DEST} clean: test_net_socket/heart_beat.cpp
@@ -50,8 +50,10 @@ net_mod_recv_msg_t *recv_arr; while (true) { sprintf(sendbuf, "%d", i); printf("SEND HEART:%s\n", sendbuf); rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 1000); rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000); //rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL); printf("SEND HEART:%s, suc nodes = %d\n", sendbuf, rv); // sleep(1); i++; } @@ -63,10 +65,10 @@ signal(SIGINT, sigint_handler); Targ *targ = (Targ *)arg; int port = targ->port; void *socket = net_mod_socket_open(); void *client = net_mod_socket_open(); int size; char sendbuf[512]; long scale = 10; long scale = 100000; long i = 0; net_node_t node_arr[] = {"", 0, 100}; int node_arr_size = 1; @@ -77,19 +79,19 @@ while (i < scale) { sprintf(sendbuf, "%d", i); printf("%d SEND HEART:%s\n", targ->id, sendbuf); net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); sleep(1); net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL); // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); i++; } net_mod_socket_close(socket); net_mod_socket_close(client); return (void *)i; } void startClients(int port) { void mclient(int port) { int status, i = 0, processors = 100; int status, i = 0, processors = 4; void *res[processors]; Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); pthread_t tids[processors]; @@ -130,12 +132,12 @@ port = atoi(argv[2]); if (strcmp("server", argv[1]) == 0) { if (strcmp("server", argv[1]) == 0) server(port); } if (strcmp("client", argv[1]) == 0) else if (strcmp("client", argv[1]) == 0) client(port); else if (strcmp("mclient", argv[1]) == 0) mclient(port); shm_mm_wrapper_destroy(); return 0; test_net_socket/heart_beat.sh
File was renamed from test_socket/test_survey.sh @@ -1,12 +1,14 @@ #! /bin/bash PROCESSES=100 function clean() { ps -ef | grep "dgram_mod_survey" | awk '{print $2}' | xargs -i kill -9 {} ps -ef | grep "heart_beat" | awk '{print $2}' | xargs -i kill -9 {} ipcrm -a } function start_server() { clean ./dgram_mod_survey server 8 & server_pid=$! ./heart_beat server 8 & server_pid=$! echo "start server pid ${server_pid}" } @@ -14,7 +16,7 @@ for (( i=0; i<$PROCESSES; i++ )) do # pid_arr[$i]=$i ./dgram_mod_survey client 8 & pid_arr[$i]=$! ./heart_beat client 8 & pid_arr[$i]=$! echo "start ${pid_arr[$i]}" done } @@ -24,7 +26,7 @@ do echo "kill ${pid_arr[$i]}" kill -9 ${pid_arr[$i]} #./dgram_mod_survey client 8 & ${pid_arr[$i]}=$! #./heart_beat client 8 & ${pid_arr[$i]}=$! done } test_queue/multiple_queue_consumerBinary files differ
test_queue/multiple_queue_productorBinary files differ
test_queue/single_consumerBinary files differ
test_queue/single_productorBinary files differ
test_queue/testBinary files differ
test_queue/test_lockfree_queueBinary files differ
test_queue/test_lostdataBinary files differ
test_queue/test_queueBinary files differ
test_queue/test_timeoutBinary files differ
test_socket/clientBinary files differ
test_socket/dgram_socket_testBinary files differ
test_socket/serverBinary files differ