8df1ff06b931b0e414ed435a033f508867b345b7..36e6a90a33983154633c99f7ac95d09dd68f7bcb
2021-02-24 wangzhengquan
update
36e6a9 对比 | 目录
2021-02-24 wangzhengquan
update
fcdbbd 对比 | 目录
1 文件已重命名
8个文件已修改
278 ■■■■■ 已修改文件
CMakeLists.txt 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/CMakeLists.txt 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/logger_factory.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/CMakeLists.txt 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/shm_util.cpp 164 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
CMakeLists.txt
@@ -30,5 +30,4 @@
    add_subdirectory(${PROJECT_SOURCE_DIR}/test)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
    add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util)
endif()
src/CMakeLists.txt
@@ -32,7 +32,7 @@
if (BUILD_SHARED_LIBS)
  add_library(shm_queue SHARED ${_SOURCES_})
else()
 add_library(shm_queue SHARED ${_SOURCES_})
 add_library(shm_queue STATIC ${_SOURCES_})
endif()
# STATIC SHARED
src/logger_factory.cpp
@@ -18,7 +18,7 @@
    config.logFile = logFile;
#ifdef BUILD_Debug
    config.level = Logger::INFO;
    config.level = Logger::DEBUG;
    config.console = 1;
#else
    config.level = Logger::ERROR;
src/net/net_mod_socket.cpp
@@ -15,37 +15,14 @@
NetModSocket::NetModSocket() 
{
  int s;
  if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
    logger->error(errno, "NetModSocket::NetModSocket signal");
  // gpool = new NetConnPool();
  // pthread_mutexattr_t mtxAttr;
  // s = pthread_mutexattr_init(&mtxAttr);
  // if (s != 0)
  //   err_exit(s, "pthread_mutexattr_init");
  // s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK);
  // if (s != 0)
  //   err_exit(s, "pthread_mutexattr_settype");
  // s = pthread_mutex_init(&sendMutex, &mtxAttr);
  // if (s != 0)
  //   err_exit(s, "pthread_mutex_init");
  // s = pthread_mutexattr_destroy(&mtxAttr);
  // if (s != 0)
  //   err_exit(s, "pthread_mutexattr_destroy");
}
NetModSocket::~NetModSocket() {
  // int s;
  // delete gpool;
  // s =  pthread_mutex_destroy(&sendMutex);
  // if(s != 0) {
  //   err_exit(s, "shm_socket_close");
  // }
}
@@ -193,7 +170,7 @@
        err_arr[n_recv_err].key = node->key;
        err_arr[n_recv_err].code = ret;
        n_recv_err++;
        logger->error("NetModSocket:: %d _sendandrecv_ to key %d failed, %s", get_key(), node->key, bus_strerror(ret));
        logger->error("NetModSocket::  _sendandrecv_ to key %d failed. %s",  node->key, bus_strerror(ret));
      }
     
src/net/net_mod_socket.h
@@ -256,10 +256,6 @@
  int  pub_nowait( char *topic, int topic_size, void *content, int content_size, int key);
  /**
   * 获取soket key
   */
src/socket/shm_socket.cpp
@@ -17,10 +17,10 @@
}
static pthread_once_t _once_ = PTHREAD_ONCE_INIT;
static pthread_key_t _perthread_socket_key_;
static pthread_key_t _localthread_socket_key_;
static void _destrory_socket_perthread(void *tmp_socket);
static void _create_socket_key_perthread(void);
static void _destrory_threadlocal_socket_(void *tmp_socket);
static void _create_threadlocal_socket_key_(void);
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,  int flag);
@@ -136,6 +136,11 @@
int shm_socket_close(shm_socket_t *sockt) {
  shm_socket_t * threadlocal_socket = (shm_socket_t *)pthread_getspecific(_localthread_socket_key_);
  if(threadlocal_socket != NULL) {
    _destrory_threadlocal_socket_(threadlocal_socket);
  }
  return _shm_socket_close_(sockt);
}
@@ -283,15 +288,17 @@
// =================================================================================================
 /* Free thread-specific data buffer */
static void _destrory_socket_perthread(void *tmp_socket)
static void _destrory_threadlocal_socket_(void *tmp_socket)
{
  int rv;
  logger->debug("%d destroy threadlocal socket\n", pthread_self());
  if(tmp_socket == NULL)
    return;
  logger->debug("%d destroy tmp socket\n", pthread_self());
  _shm_socket_close_((shm_socket_t *)tmp_socket);
  rv =  pthread_setspecific(_perthread_socket_key_, NULL);
  rv =  pthread_setspecific(_localthread_socket_key_, NULL);
  if ( rv != 0) {
    logger->error(rv, "shm_sendandrecv : pthread_setspecific");
    exit(1);
@@ -299,14 +306,14 @@
}
/* One-time key creation function */
static void _create_socket_key_perthread(void)
static void _create_threadlocal_socket_key_(void)
{
  int s;
  /* Allocate a unique thread-specific data key and save the address
     of the destructor for thread-specific data buffers */
  s = pthread_key_create(&_perthread_socket_key_, _destrory_socket_perthread);
  //s = pthread_key_create(&_perthread_socket_key_, NULL);
  s = pthread_key_create(&_localthread_socket_key_, _destrory_threadlocal_socket_);
  //s = pthread_key_create(&_localthread_socket_key_, NULL);
  if (s != 0) {
     logger->error(s, "pthread_key_create");
     exit(1);
@@ -403,22 +410,22 @@
  shm_packet_t recvpak;
  std::map<int, shm_packet_t>::iterator recvbufIter;
  // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息
  shm_socket_t *tmp_socket;
  shm_socket_t *tmp_socket = NULL;
 
  rv = pthread_once(&_once_, _create_socket_key_perthread);
  rv = pthread_once(&_once_, _create_threadlocal_socket_key_);
  if (rv != 0) {
    logger->error(rv, "shm_sendandrecv pthread_once");
    exit(1);
  }
  tmp_socket = (shm_socket_t *)pthread_getspecific(_perthread_socket_key_);
  tmp_socket = (shm_socket_t *)pthread_getspecific(_localthread_socket_key_);
  if (tmp_socket == NULL)
  {
    /* If first call from this thread, allocate buffer for thread, and save its location */
    logger->debug("%ld create tmp socket\n", (long)pthread_self() );
    logger->debug("%lu create threadlocal socket\n", (long)pthread_self() );
    tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
    rv =  pthread_setspecific(_perthread_socket_key_, tmp_socket);
    rv =  pthread_setspecific(_localthread_socket_key_, tmp_socket);
    if ( rv != 0) {
      logger->error(rv, "shm_sendandrecv : pthread_setspecific");
      exit(1);
@@ -502,7 +509,7 @@
      tryn++;
      rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
      if(rv != 0) {
        logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
        logger->error("_shm_sendandrecv_alloc_new : %s\n", bus_strerror(rv));
        return rv;
      }
     
test_net_socket/CMakeLists.txt
@@ -7,9 +7,9 @@
# add the executable
add_executable(test_net_mod_socket test_net_mod_socket.cpp  ${PROJECT_BINARY_DIR}/bin/net_mod_socket.sh)
target_link_libraries(test_net_mod_socket PRIVATE shm_queue  ${EXTRA_LIBS} )
target_include_directories(test_net_mod_socket PRIVATE
add_executable(shm_util shm_util.cpp  ${PROJECT_BINARY_DIR}/bin/net_mod_socket.sh)
target_link_libraries(shm_util PRIVATE shm_queue  ${EXTRA_LIBS} )
target_include_directories(shm_util PRIVATE
                            "${PROJECT_BINARY_DIR}"
                             ${EXTRA_INCLUDES}
                            )
@@ -31,4 +31,4 @@
# add the install targets
install(TARGETS test_net_mod_socket DESTINATION bin)
install(TARGETS shm_util DESTINATION bin)
test_net_socket/net_mod_socket.sh
@@ -1,29 +1,28 @@
function server() {
    
    # 开启bus 
 ./test_net_mod_socket --fun="start_bus_server"  & server_pid=$! &&  echo "pid: ${server_pid}"
 ./shm_util start_bus_server  & server_pid=$! &&  echo "pid: ${server_pid}"
    # 开启网络转发代理
    ./test_net_mod_socket  --fun="start_net_proxy" --port=5000 & server_pid=$! && echo "pid: ${server_pid}"
    ./shm_util  start_net_proxy --port=5000 & server_pid=$! && echo "pid: ${server_pid}"
    # 打开请求应答测试的接受端
    ./test_net_mod_socket --fun="start_reply" --key=100 & server_pid=$! &&  echo "pid: ${server_pid}"
    ./test_net_mod_socket --fun="start_reply" --key=101 & server_pid=$! &&  echo "pid: ${server_pid}"
    ./test_net_mod_socket --fun="start_reply" --key=102 & server_pid=$! &&  echo "pid: ${server_pid}"
    ./shm_util start_reply --key=100 & server_pid=$! &&  echo "pid: ${server_pid}"
    ./shm_util start_reply --key=101 & server_pid=$! &&  echo "pid: ${server_pid}"
    ./shm_util start_reply --key=102 & server_pid=$! &&  echo "pid: ${server_pid}"
    # 打开回队列收进程
    ./test_net_mod_socket --fun="start_resycle" & server_pid=$! &&  echo "pid: ${server_pid}"
    ./shm_util start_resycle & server_pid=$! &&  echo "pid: ${server_pid}"
}
# 交互式客户端
function client() {
    # ./test_net_mod_socket --fun="start_net_client" \
    # ./shm_util start_net_client \
    #  --sendlist="192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.20.104:5000:11" \
    #  --publist="192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.20.104:5000:8"
    ./test_net_mod_socket --fun="start_net_client" \
     --sendlist="localhost:5000:100" \
    ./shm_util start_net_client \
     --sendlist=" :5000:100" \
     --publist="localhost:5000"  
     
@@ -31,38 +30,38 @@
# one_to_many send
function one_to_many() {
    ./test_net_mod_socket --fun="one_sendto_many" \
    ./shm_util one_sendto_many \
     --sendlist=" :5000:100, :5000:101, :5000:102"
     
}
#  
function send() {
    ./test_net_mod_socket --fun="test_net_sendandrecv" \
    ./shm_util test_net_sendandrecv \
     --sendlist=" :5000:100, :5000:101, :5000:102"
     
}
 
# 无限循环 pub
function pub() {
    ./test_net_mod_socket --fun="test_net_pub" \
    ./shm_util test_net_pub \
     --publist="localhost:5000, localhost:5000"
     
}
# 多线程pub
function mpub() {
    ./test_net_mod_socket --fun="test_net_pub_threads" \
    ./shm_util test_net_pub_threads \
     --publist="localhost:5000, localhost:5000"
     
}
function stop() {
    ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk  '{print $2}' | xargs -i kill -15 {}
    ps -ef | grep -e "shm_util" -e "heart_beat"| awk  '{print $2}' | xargs -i kill -15 {}
    
}
function close() {
    ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk  '{print $2}' | xargs -i kill -9 {}
    ps -ef | grep -e "shm_util" -e "heart_beat"| awk  '{print $2}' | xargs -i kill -9 {}
    ipcrm -a
}
test_net_socket/shm_util.cpp
File was renamed from test_net_socket/test_net_mod_socket.cpp
@@ -21,7 +21,6 @@
struct argument_t {
  bool interactive;
  char *fun;
  int port;
  int key;
  char *sendlist;
@@ -526,19 +525,77 @@
  net_mod_socket_close(client);
}
void list () {
  hashtable_t *hashtable = mm_get_hashtable();
  hashtable_foreach(hashtable, [&](int key, void * value){
    printf("%d\n", key);
  });
}
void remove(int key) {
  hashtable_t *hashtable = mm_get_hashtable();
  LockFreeQueue<shm_packet_t> * mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, key);
  if(mqueue != NULL) {
    delete mqueue;
    hashtable_remove(hashtable, key);
  }
}
void do_sendandrecv(int key, char *sendbuf) {
  int n, j;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  net_node_t node_arr[] = {NULL, 0, key};
  void * client = net_mod_socket_open();
  n = net_mod_socket_sendandrecv_timeout(client, node_arr, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 5);
  if(n == 0) {
    printf("send failed\n");
    return;
  }
  printf(" %d nodes reply\n", n);
  for(j=0; j < recv_arr_size; j++) {
    fprintf(stdout, "%d send '%s' to %d. received  from (host=%s, port= %d, key=%d) '%s'\n\n",
      net_mod_socket_get_key(client),
      sendbuf,
      key,
      recv_arr[j].host,
      recv_arr[j].port,
      recv_arr[j].key,
      (char *)recv_arr[j].content
    );
  }
  net_mod_socket_close(client);
}
void usage(char *name)
{
  fprintf(stderr, "Usage: %s  [OPTIONS]  [ARG...]\n\n", name);
  fprintf(stderr, "Test net mod socket\n\n");
  fprintf(stderr, "Options:\n\n");
  #define fpe(str) fprintf(stderr, "  %s", str);
  fpe("-f, --funciton                       Function name\n");
  fprintf(stderr, "Usage: %s {function} [OPTIONS]  [ARG...]\n\n", name);
  fprintf(stderr, "Test shmsocket\n\n");
  fprintf(stderr, "Options:\n\n");
  fpe("-p, --port                           TCP/IP Port\n");
  fpe("-k, --key                            SHM Key\n");
  fpe("--sendlist                           format:--sendlist=\"192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.20.104:5000:11\"\n");
  fpe("--publist                            format: --publist=\"192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.20.104:5000:8\"\n");
  fpe("\n");
  fprintf(stderr, "Examples:\n\n");
  fpe("# sendandrecv to socket which has key 100\n");
  fpe("./shm_util sendandrecv 100 \"hello\"\n");
  fpe("# list all key\n");
  fpe("./shm_util list\n");
  fpe("# remove  key 1001\n");
  fpe("./shm_util rm 1001\n");
  fpe("\n");
}
@@ -553,10 +610,7 @@
    exit(1);
  }
  if(argc == 2 && strcmp(argv[1], "--help") == 0) {
    usage(argv[0]);
    exit(0);
  }
  
  argument_t mopt = {};
  
@@ -570,7 +624,6 @@
  {
    /* These options set a flag. */
     
    {"fun",  required_argument, 0, 'f'},
    {"key",  required_argument, 0, 'k'},
    {"port",  required_argument, 0, 'p'},
    {"interactive",  no_argument, 0, 'i'},
@@ -612,10 +665,6 @@
    
      break;
     
    case 'f':
      mopt.fun = optarg;
      break;
    case 'k':
      mopt.key = atoi(optarg);
      break;
@@ -708,37 +757,78 @@
int main(int argc, char *argv[]) {
  shm_mm_wrapper_init(512);
  argument_t opt = parse_args(argc, argv);
  int i;
  char *prog;
  char * fun;
  argument_t opt;
 // port = atoi(argv[2]);
  if(opt.fun == NULL) {
  shm_mm_wrapper_init(512);
  if(argc < 2) {
    usage(argv[0]);
    exit(1);
  }
  prog = argv[0];
  fun = argv[1];
  argc--;
  argv++;
  if (strcmp("start_net_proxy", opt.fun) == 0 ) {
  if (strcmp("help", fun) == 0 ) {
    usage(prog);
  }
  else if (strcmp("list", fun) == 0 ) {
    list();
  }
  else if (strcmp("rm", fun) == 0 ) {
    if(argc < 2) {
      usage(prog);
      exit(1);
    }
    for(i = 1; i < argc; i++) {
      int key = atoi(argv[i]);
      remove(key);
    }
  }
  else if (strcmp("sendandrecv", fun) == 0 ) {
    if(argc < 3) {
      usage(prog);
      exit(1);
    }
    int key = atoi(argv[1]);
    char *content = argv[2];
    do_sendandrecv(key, content);
  }
  else if (strcmp("start_bus_server", fun) == 0) {
    start_bus_server(opt);
  }
  else if (strcmp("start_resycle", fun) == 0) {
    start_resycle();
  }
  else if (strcmp("start_net_proxy", fun) == 0 ) {
    opt =  parse_args(argc, argv);
    if(opt.port == 0) {
      usage(argv[0]);
      usage(prog);
      exit(1);
    }
    start_net_proxy(opt);
    
  }
  else if (strcmp("start_bus_server", opt.fun) == 0) {
    start_bus_server(opt);
  }
  else if (strcmp("start_reply", opt.fun) == 0) {
  else if (strcmp("start_reply", fun) == 0) {
    opt =  parse_args(argc, argv);
    opt =  parse_args(argc, argv);
    if(opt.key == 0) {
      usage(argv[0]);
      exit(1);
    }
    start_reply(opt.key);
  }
  else if (strcmp("start_net_client", opt.fun) == 0) {
  else if (strcmp("start_net_client", fun) == 0) {
    opt =  parse_args(argc, argv);
    if(opt.sendlist == 0) {
      fprintf(stderr, "Missing sendlist .\n");
      usage(argv[0]);
@@ -751,7 +841,8 @@
    }
    start_net_client(opt.sendlist, opt.publist);
  }
  else if (strcmp("one_sendto_many", opt.fun) == 0) {
  else if (strcmp("one_sendto_many", fun) == 0) {
    opt =  parse_args(argc, argv);
    if(opt.sendlist == 0) {
      fprintf(stderr, "Missing sendlist .\n");
      usage(argv[0]);
@@ -760,7 +851,8 @@
     
    one_sendto_many(opt.sendlist);
  }
  else if (strcmp("test_net_sendandrecv", opt.fun) == 0) {
  else if (strcmp("test_net_sendandrecv", fun) == 0) {
    opt =  parse_args(argc, argv);
    if(opt.sendlist == 0) {
      fprintf(stderr, "Missing sendlist .\n");
      usage(argv[0]);
@@ -769,7 +861,8 @@
     
    test_net_sendandrecv(opt.sendlist);
  }
  else if (strcmp("test_net_pub_threads", opt.fun) == 0) {
  else if (strcmp("test_net_pub_threads", fun) == 0) {
    opt =  parse_args(argc, argv);
    if(opt.publist == 0) {
      fprintf(stderr, "Missing publist .\n");
      usage(argv[0]);
@@ -778,7 +871,8 @@
     
    test_net_pub_threads(opt.publist);
  }
  else if (strcmp("test_net_pub", opt.fun) == 0) {
  else if (strcmp("test_net_pub", fun) == 0) {
    opt =  parse_args(argc, argv);
    if(opt.publist == 0) {
      fprintf(stderr, "Missing publist .\n");
      usage(argv[0]);
@@ -787,11 +881,9 @@
     
    test_net_pub(opt.publist);
  }
  else if (strcmp("start_resycle", opt.fun) == 0) {
    start_resycle();
  }
  else {
    printf("%Invalid funciton name\n");
    usage(argv[0]);
    exit(1);