wangzhengquan
2021-01-06 7285f5762bd3f1be94884730d9a28dd406f98fc5
update
13个文件已删除
3个文件已添加
1 文件已重命名
9个文件已修改
298 ■■■■ 已修改文件
.gitignore 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Make.defines.linux 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
build.sh 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/logger_factory.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/px_sem_util.cpp 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/px_sem_util.h 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 112 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/shm_queue.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/Makefile 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test2 补丁 | 查看 | 原始文档 | blame | 历史
test/test_sem.c 78 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/Makefile 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/heart_beat.cpp 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/heart_beat.sh 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_queue/multiple_queue_consumer 补丁 | 查看 | 原始文档 | blame | 历史
test_queue/multiple_queue_productor 补丁 | 查看 | 原始文档 | blame | 历史
test_queue/single_consumer 补丁 | 查看 | 原始文档 | blame | 历史
test_queue/single_productor 补丁 | 查看 | 原始文档 | blame | 历史
test_queue/test 补丁 | 查看 | 原始文档 | blame | 历史
test_queue/test_lockfree_queue 补丁 | 查看 | 原始文档 | blame | 历史
test_queue/test_lostdata 补丁 | 查看 | 原始文档 | blame | 历史
test_queue/test_queue 补丁 | 查看 | 原始文档 | blame | 历史
test_queue/test_timeout 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/client 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_socket_test 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/server 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -42,5 +42,6 @@
.idea/
build/
dest/
*.tmp
core
Make.defines.linux
@@ -36,7 +36,7 @@
# preprocessor options 
CPPFLAGS += $(INCLUDES) -std=c++11
# compilation options
CFLAGS += $(DEBUGFLAGS) -Wall -DLINUX -D_GNU_SOURCE $(EXTRA)
CFLAGS += $(DEBUGFLAGS) -Wall -DLINUX -D_GNU_SOURCE  -D_POSIX_C_SOURCE=200112L $(EXTRA)
# linked options
LDFLAGS +=
build.sh
@@ -3,12 +3,14 @@
[ -d build ] || mkdir build
rm -rf build/*
cd build
# cmake ../Step2
# cmake ../Step4 -DSUPPORT_RDMA=OFF
cmake -DCMAKE_INSTALL_PREFIX=/home/wzq/tmp2  -DSUPPORT_RDMA=OFF ..
# -DCMAKE_BUILD_TYPE=Debug  | Release
# -DBUILD_SHARED_LIBS=ON
# -DCMAKE_INSTALL_PREFIX=$(pwd/../dest)
# -DQCA_MAN_INSTALL_DIR:PATH=/usr/share/man
cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=Debug -DSUPPORT_RDMA=OFF ..
cmake --build .
cmake --build . --target install
src/logger_factory.h
@@ -14,7 +14,7 @@
            return logger;
         
        LoggerConfig config;
        config.level = Logger::INFO;
        config.level = Logger::DEBUG;
        config.logFile =  "softbus.log";
        config.console = 1;
        logger = new Logger(config);
src/px_sem_util.cpp
New file
@@ -0,0 +1,13 @@
#include "px_sem_util.h"
struct timespec PXSemUtil::calc_sem_timeout(const struct timespec *ts) {
    int tmp_sec;
  struct timespec timeout;
  if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
      err_exit(errno, "clock_gettime");
  timeout.tv_nsec += ts->tv_nsec;
  tmp_sec =  timeout.tv_nsec / 10e9;
  timeout.tv_nsec =  timeout.tv_nsec - tmp_sec * 10e9;
  timeout.tv_sec += ts->tv_sec + tmp_sec;
  return timeout;
}
src/px_sem_util.h
New file
@@ -0,0 +1,13 @@
#ifndef PX_SEM_UTIL_H
#define PX_SEM_UTIL_H
#include "usg_common.h"
#include "usg_typedef.h"
namespace PXSemUtil {
    struct timespec calc_sem_timeout(const struct timespec *ts);
}
#endif
src/queue/lock_free_queue.h
@@ -7,6 +7,7 @@
#include "sem_util.h"
#include "logger_factory.h"
#include "shm_allocator.h"
#include "px_sem_util.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
@@ -75,11 +76,13 @@
{
private:
    int slots;
    int items;
    sem_t slots;
    sem_t items;
   
public:
    int mutex;
    sem_t mutex;
    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    
    /// @brief destructor of the class. 
@@ -150,9 +153,14 @@
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
{
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
    slots = SemUtil::get(IPC_PRIVATE, qsize);
    items = SemUtil::get(IPC_PRIVATE, 0);
    mutex = SemUtil::get(IPC_PRIVATE, 1);
    if (sem_init(&slots, 1, qsize) == -1)
        err_exit(errno, "LockFreeQueue sem_init");
    if (sem_init(&items, 1, 0) == -1)
        err_exit(errno, "LockFreeQueue sem_init");
    if (sem_init(&mutex, 1, 1) == -1)
        err_exit(errno, "LockFreeQueue sem_init");
}
template <
@@ -162,9 +170,15 @@
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
{
    // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
    SemUtil::remove(slots);
    SemUtil::remove(items);
    SemUtil::remove(mutex);
    if(sem_destroy(&slots) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
    }
    if(sem_destroy(&items) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
    }
    if(sem_destroy(&mutex) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
    }
}
template <
@@ -201,16 +215,15 @@
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
    if (SemUtil::dec(slots) == -1) {
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
    if (sem_wait(&slots) == -1) {
        err_msg(errno, "LockFreeQueue push");
        return false;
    }
    if ( m_qImpl.push(a_data) ) {
        SemUtil::inc(items);
LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
        sem_post(&items);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
        return true;
    }
    return false;
@@ -223,7 +236,7 @@
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (SemUtil::dec_nowait(slots) == -1) {
    if (sem_trywait(&slots) == -1) {
        if (errno == EAGAIN)
            return false;
        else {
@@ -234,7 +247,7 @@
    }
    if ( m_qImpl.push(a_data)) {
        SemUtil::inc(items);
        sem_post(&items);
        return true;
    }
    return false;
@@ -245,22 +258,38 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout)
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
{
    // int tmp_sec;
    // struct timespec timeout;
    // if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
    //     err_exit(errno, "clock_gettime");
    // timeout.tv_nsec += ts->tv_nsec;
    // tmp_sec =  timeout.tv_nsec / 10e9;
    // timeout.tv_nsec =  timeout.tv_nsec - tmp_sec * 10e9;
    // timeout.tv_sec += ts->tv_sec + tmp_sec;
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");
    if (SemUtil::dec_timeout(slots, timeout) == -1) {
        if (errno == EAGAIN)
    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
  // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld",
  //   timeout.tv_sec, timeout.tv_nsec);
    while (sem_timedwait(&slots, &timeout) == -1) {
    //     LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n",
    // timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno);
        if(errno == ETIMEDOUT)
            return false;
        else if(errno == EINTR)
            continue;
        else {
            err_msg(errno, "LockFreeQueue push_timeout");
            LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
            return false;
        }
    }
    if (m_qImpl.push(a_data)){
        SemUtil::inc(items);
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
        sem_post(&items);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
        return true;
    }
    return false;
@@ -277,19 +306,18 @@
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if (SemUtil::dec(items) == -1) {
        err_msg(errno, "LockFreeQueue pop");
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if (sem_wait(&items) == -1) {
        LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop");
        return false;
    }
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);
LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
        sem_post(&slots);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
        return true;
    }
    return false;
}
template <
@@ -298,21 +326,20 @@
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (SemUtil::dec_nowait(items) == -1) {
    if (sem_trywait(&items) == -1) {
        if (errno == EAGAIN)
            return false;
        else {
            err_msg(errno, "LockFreeQueue pop_nowait");
            LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait");
            return false;
        }
    }
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);
        sem_post(&slots);
        return true;
    }
    return false;
}
 
@@ -320,21 +347,26 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");
    if (SemUtil::dec_timeout(items, timeout) == -1) {
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");
     struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
    while (sem_timedwait(&items, &timeout) == -1) {
        if (errno == EAGAIN)
            return false;
        else if(errno == EINTR)
            continue;
        else {
            // err_msg(errno, "LockFreeQueue pop_timeout");
            LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout");
            return false;
        }
    }
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);
LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");
        sem_post(&slots);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");
        return true;
    }
    return false;
src/queue/shm_queue.h
@@ -124,7 +124,7 @@
    return;
  }
  SemUtil::dec(queue->mutex);
  sem_wait(&(queue->mutex));
  queue->reference--;
  // LoggerFactory::getLogger()->debug("SHMQueue destructor  reference===%d",
  if (queue->reference.load() == 0) {
@@ -132,10 +132,10 @@
      queue = NULL;
      hashtable_t *hashtable = mm_get_hashtable();
      hashtable_remove(hashtable, KEY);
      // 此时queue已经销毁,无需 SemUtil::inc(queue->mutex)
      // 此时queue已经销毁,无需  sem_post(&(queue->mutex))
      // printf("SHMQueue destructor delete queue\n");
  } else {
      SemUtil::inc(queue->mutex);
      sem_post(&(queue->mutex));
  }
  
}
test/Makefile
@@ -3,27 +3,25 @@
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
#RPATH += -Wl,-rpath=${ROOT}/lib
RPATH += -Wl,-rpath=$(ROOT)/lib:$(DEST)/lib
# 开源工具包路径
LDDIR += -L${DEST}/lib
LDDIR += -L$(DEST)/lib
#-lusgcommon
# 开源工具包
LDLIBS += -lshm_queue -lusgcommon -lpthread
LDLIBS += -lshm_queue -lpthread -lusgcommon
INCLUDES += -I${DEST}/include/shmqueue -I$(ROOT)/include/usgcommon
#-I$(ROOT)/include/usgcommon
INCLUDES += -I${ROOT}/src -I${ROOT}/src/shm -I${ROOT}/src/queue -I${ROOT}/src/socket  -I${ROOT}/include/usgcommon
PROGS = ${DEST}/test
PROGS = ${DEST}/test_sem
DEPENDENCES = $(patsubst %, %.d, $(PROGS)) 
#LIBCOMMON=${ROOT}/lib/libusgcommon.a
build:     $(PROGS)
# class
#$(DEST)/kucker : kucker.c
build: $(PROGS)
clean:
test/test2
Binary files differ
test/test_sem.c
New file
@@ -0,0 +1,78 @@
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <time.h>
#include <assert.h>
#include <errno.h>
#include <signal.h>
sem_t sem;
#define handle_error(msg) \
           do { perror(msg); exit(EXIT_FAILURE); } while (0)
static void
handler(int sig)
{
  write(STDOUT_FILENO, "sem_post() from handler\n", 24);
  if (sem_post(&sem) == -1)
  {
    write(STDERR_FILENO, "sem_post() failed\n", 18);
    _exit(EXIT_FAILURE);
  }
}
int
main(int argc, char *argv[])
{
  struct sigaction sa;
  struct timespec ts;
  int s;
  if (argc != 3)
  {
    fprintf(stderr, "Usage: %s <alarm-secs> <wait-secs>\n",
            argv[0]);
    exit(EXIT_FAILURE);
  }
  if (sem_init(&sem, 0, 0) == -1)
    handle_error("sem_init");
  /* Establish SIGALRM handler; set alarm timer using argv[1] */
  sa.sa_handler = handler;
  sigemptyset(&sa.sa_mask);
  sa.sa_flags = 0;
  if (sigaction(SIGALRM, &sa, NULL) == -1)
    handle_error("sigaction");
  alarm(atoi(argv[1]));
  /* Calculate relative interval as current time plus
     number of seconds given argv[2] */
  if (clock_gettime(CLOCK_REALTIME, &ts) == -1)
    handle_error("clock_gettime");
  ts.tv_sec += atoi(argv[2]);
  printf("main() about to call sem_timedwait()\n");
  while ((s = sem_timedwait(&sem, &ts)) == -1 && errno == EINTR)
    continue;       /* Restart if interrupted by handler */
  /* Check what happened */
  if (s == -1)
  {
    if (errno == ETIMEDOUT)
      printf("sem_timedwait() timed out\n");
    else
      perror("sem_timedwait");
  }
  else
    printf("sem_timedwait() succeeded\n");
  exit((s == 0) ? EXIT_SUCCESS : EXIT_FAILURE);
}
test_net_socket/Makefile
@@ -22,6 +22,7 @@
build: $(PROGS)
    cp -a net_mod_socket.sh ${DEST}
    cp -a heart_beat.sh ${DEST}
clean:
test_net_socket/heart_beat.cpp
@@ -50,8 +50,10 @@
  net_mod_recv_msg_t *recv_arr;
  while (true) {
    sprintf(sendbuf, "%d", i);
    printf("SEND HEART:%s\n", sendbuf);
    rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 1000);
    rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000);
    //rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
    printf("SEND HEART:%s, suc nodes = %d\n", sendbuf, rv);
   // sleep(1);
    i++;
  }
@@ -63,10 +65,10 @@
  signal(SIGINT,  sigint_handler);
  Targ *targ = (Targ *)arg;
  int port = targ->port;
  void *socket = net_mod_socket_open();
  void *client = net_mod_socket_open();
  int size;
  char sendbuf[512];
  long scale = 10;
  long scale = 100000;
  long i = 0;
  net_node_t node_arr[] = {"", 0, 100};
  int node_arr_size = 1;
@@ -77,19 +79,19 @@
  while (i < scale) {
    sprintf(sendbuf, "%d", i);
    printf("%d SEND HEART:%s\n", targ->id, sendbuf);
    net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
    sleep(1);
    net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
    // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
    i++;
  }
  
   net_mod_socket_close(socket);
   net_mod_socket_close(client);
  return (void *)i;
}
 
void startClients(int port) {
void mclient(int port) {
  int status, i = 0, processors = 100;
  int status, i = 0, processors = 4;
  void *res[processors];
  Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
  pthread_t tids[processors];
@@ -130,12 +132,12 @@
  port = atoi(argv[2]);
  if (strcmp("server", argv[1]) == 0) {
  if (strcmp("server", argv[1]) == 0)
    server(port);
  }
  if (strcmp("client", argv[1]) == 0)
  else if (strcmp("client", argv[1]) == 0)
    client(port);
  else if (strcmp("mclient", argv[1]) == 0)
    mclient(port);
  shm_mm_wrapper_destroy();
  return 0;
test_net_socket/heart_beat.sh
File was renamed from test_socket/test_survey.sh
@@ -1,12 +1,14 @@
#! /bin/bash
PROCESSES=100
function clean() {
     ps -ef | grep "dgram_mod_survey" | awk  '{print $2}' | xargs -i kill -9 {}
     ps -ef | grep "heart_beat" | awk  '{print $2}' | xargs -i kill -9 {}
     ipcrm -a
}
function start_server() {
    clean
    ./dgram_mod_survey server 8 & server_pid=$!
    ./heart_beat server 8 & server_pid=$!
    echo "start server pid ${server_pid}"
}
@@ -14,7 +16,7 @@
    for (( i=0; i<$PROCESSES; i++ ))
    do
        # pid_arr[$i]=$i
        ./dgram_mod_survey client 8 & pid_arr[$i]=$!
        ./heart_beat client 8 & pid_arr[$i]=$!
        echo "start ${pid_arr[$i]}" 
    done
}
@@ -24,7 +26,7 @@
    do
        echo "kill ${pid_arr[$i]}" 
        kill -9 ${pid_arr[$i]}
        #./dgram_mod_survey client 8 & ${pid_arr[$i]}=$!
        #./heart_beat client 8 & ${pid_arr[$i]}=$!
    done
}
test_queue/multiple_queue_consumer
Binary files differ
test_queue/multiple_queue_productor
Binary files differ
test_queue/single_consumer
Binary files differ
test_queue/single_productor
Binary files differ
test_queue/test
Binary files differ
test_queue/test_lockfree_queue
Binary files differ
test_queue/test_lostdata
Binary files differ
test_queue/test_queue
Binary files differ
test_queue/test_timeout
Binary files differ
test_socket/client
Binary files differ
test_socket/dgram_socket_test
Binary files differ
test_socket/server
Binary files differ