wangzhengquan
2021-01-29 b9b8088b1f5e7ca29d108f1c87b75855d6735d1e
update
1个文件已添加
6个文件已修改
252 ■■■■ 已修改文件
src/logger_factory.cpp 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/one_sendto_many.cpp 107 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.cpp 88 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/logger_factory.cpp
@@ -10,14 +10,15 @@
        return logger;
     
    LoggerConfig config;
    config.level = Logger::DEBUG;
    config.logFile =  "/tmp/bhome_bus.log";
#ifdef BUILD_Debug
    config.level = Logger::DEBUG;
#else
    config.level = Logger::INFO;
#endif
    config.logFile =  "bhome_bus.log";
    config.console = 1;
#else
    config.console = 0;
#endif
    logger = new Logger(config);
    return logger;
}
src/queue/lock_free_queue.h
@@ -234,7 +234,7 @@
  if (m_qImpl.push(a_data)) {
    psem_post(&items);
    // sigprocmask(SIG_SETMASK, &pre, NULL);
    LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
    // LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
    return 0;
  }
src/socket/shm_mod_socket.cpp
@@ -79,7 +79,15 @@
*/
int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, 
    void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
    int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
    if(rv == 0) {
      logger->debug("ShmModSocket::sendandrecv:  sendandrecv to %d success.\n", send_key);
      return 0;
  }
  logger->debug("ShmModSocket::sendandrecv : sendandrecv to %d failed %s",  send_key, bus_strerror(rv));
    return rv;
}
 
// // 超时返回。 @sec 秒 , @nsec 纳秒
src/socket/shm_socket.cpp
@@ -552,29 +552,7 @@
  return rv;
 
}
// int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf,
//                     const int send_size, const int send_key, void **recv_buf,
//                     int *recv_size,  const struct timespec *timeout,  int flags) {
//   if (socket->socket_type != SHM_SOCKET_DGRAM) {
//     logger->error( "shm_socket.shm_sendandrecv_unsafe : Can't invoke shm_sendandrecv method in a %d type socket  "
//                 "which is not a SHM_SOCKET_DGRAM socket ",
//              socket->socket_type);
//     exit(1);
//   }
//   int recv_key;
//   int rv;
 
//   if ((rv = shm_sendto(socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
//     rv = shm_recvfrom(socket, recv_buf, recv_size, &recv_key, timeout, flags);
//     return rv;
//   } else {
//     return rv;
//   }
//   return -1;
// }
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  const struct timespec *timeout,  int flags) {
test_net_socket/net_mod_socket.sh
@@ -7,6 +7,8 @@
    # 打开请求应答测试的接受端
    ./test_net_mod_socket --fun="start_reply" --key=100 & server_pid=$! &&  echo "pid: ${server_pid}" 
    ./test_net_mod_socket --fun="start_reply" --key=101 & server_pid=$! &&  echo "pid: ${server_pid}"
    ./test_net_mod_socket --fun="start_reply" --key=102 & server_pid=$! &&  echo "pid: ${server_pid}"
}
# 交互式客户端
@@ -25,9 +27,9 @@
}
# 无限循环send
function send() {
    ./test_net_mod_socket --fun="test_net_sendandrecv" \
     --sendlist="localhost:5000:100,  localhost:5000:100"
function one_to_many() {
    ./test_net_mod_socket --fun="one_sendto_many" \
     --sendlist=" :5000:100, :5000:101, :5000:102"
     
}
# 多线程send
@@ -67,8 +69,8 @@
  "client")
     client
  ;;
  "msend")
    msend
  "one_to_many")
    one_to_many
  ;;
  "send")
    send
test_net_socket/one_sendto_many.cpp
New file
@@ -0,0 +1,107 @@
#include <assert.h>
#include "net_mod_server_socket_wrapper.h"
#include "net_mod_socket_wrapper.h"
#include "bus_server_socket_wrapper.h"
#include "shm_mm_wrapper.h"
#include "usg_common.h"
#include <getopt.h>
#include "logger_factory.h"
void *_run_sendandrecv_(void *arg) {
  Targ *targ = (Targ *)arg;
  char sendbuf[128];
  int j, n;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  int total = 0;
  net_node_t *node_arr;
  int node_arr_size = parse_node_list(targ->nodelist, &node_arr);
  long rtid;
  unsigned int l = 0 , rl;
  const char *hello_format = "%ld say Hello %d";
    char filename[512];
    sprintf(filename, "test%d.tmp", targ->id);
    FILE *fp = NULL;
    fp = fopen(filename, "w+");
    // fp = stdout;
    int recvsize;
    void *recvbuf;
  for (l = 0; l < SCALE; l++) {
    sprintf(sendbuf, hello_format, targ->id, l);
    // fprintf(fp, "requst:%s\n", sendbuf);
    // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
    n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000);
    printf("%d: send %d nodes\n", l, n);
    for(j=0; j < recv_arr_size; j++) {
      fprintf(fp, "%ld send '%s'. received '%s' from (host:%s, port: %d, key:%d) \n",
        targ->id,
        sendbuf,
        recv_arr[j].content,
        recv_arr[j].host,
        recv_arr[j].port,
        recv_arr[j].key
      );
      assert(sscanf((const char *)recv_arr[j].content, hello_format, &rtid, &rl) == 2);
      assert(rtid == targ->id);
      assert(rl == l);
    }
        // 使用完后,不要忘记释放掉
        net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
    total += n;
  }
  // fclose(fp);
  // net_mod_socket_close(client);
  return (void *)total;
}
//多线程send
void test_net_sendandrecv_threads(char *nodelist) {
  int status, i = 0, processors = 4;
  void *res[processors];
  // Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
  Targ targs[processors];
  pthread_t tids[processors];
  char sendbuf[512];
  struct timeval start, end;
  long total = 0;
  client = net_mod_socket_open();
  printf("开始测试...\n");
  gettimeofday(&start, NULL);
  for (i = 0; i < processors; i++) {
    targs[i].nodelist = nodelist;
    targs[i].id = i;
    pthread_create(&tids[i], NULL, _run_sendandrecv_, (void *)&targs[i]);
  }
  for (i = 0; i < processors; i++) {
    if (pthread_join(tids[i], &res[i]) != 0) {
      perror("multyThreadClient pthread_join");
    } else {
        total += (long)res[i];
      //fprintf(stderr, "client(%d) 写入 %ld 条数据\n", i, (long)res[i]);
    }
  }
  gettimeofday(&end, NULL);
  double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
  long diffsec = (long) (difftime/1000000);
  long diffusec = difftime - diffsec*1000000;
  fprintf(stderr,"发送数目: %ld, 用时: (%ld sec %ld usec), 平均: %f\n", total, diffsec, diffusec, difftime/total );
  // fflush(stdout);
}
test_net_socket/test_net_mod_socket.cpp
@@ -8,9 +8,10 @@
#include <getopt.h>
#include "logger_factory.h"
#define  SCALE  100000
#define  SCALE  1000000
typedef struct Targ {
  net_node_t *node;
    char *nodelist;
    long id;
@@ -134,17 +135,17 @@
void start_reply(int key) {
  printf("start reply\n");
  void *client = net_mod_socket_open();
  net_mod_socket_bind(client, key);
  void *ser = net_mod_socket_open();
  net_mod_socket_bind(ser, key);
  int size;
  void *recvbuf;
  char sendbuf[512];
  int rv;
  int remote_port;
  while ( (rv = net_mod_socket_recvfrom(client, &recvbuf, &size, &remote_port) ) == 0) {
  while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &remote_port) ) == 0) {
   // printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
    sprintf(sendbuf, "%s", recvbuf);
    net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port);
    sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), recvbuf);
    net_mod_socket_sendto(ser, sendbuf, strlen(sendbuf) + 1, remote_port);
    free(recvbuf);
  }
}
@@ -245,7 +246,7 @@
  
}
void *_run_sendandrecv_(void *arg) {
void *_run_one_sendto_many_(void *arg) {
  Targ *targ = (Targ *)arg;
  char sendbuf[128];
 
@@ -254,17 +255,13 @@
  net_mod_recv_msg_t *recv_arr;
  int total = 0;
 
  net_node_t *node_arr;
  int node_arr_size = parse_node_list(targ->nodelist, &node_arr);
  long rtid;
  int rkey, lkey;
  unsigned int l = 0 , rl;
  const char *hello_format = "%ld say Hello %d";
  const char *hello_format = "%d say Hello %d";
  const char *reply_format = "%d RECEIVED %d say Hello %d";
    char filename[512];
    sprintf(filename, "test%d.tmp", targ->id);
    char filename[128];
    sprintf(filename, "test%d.tmp", targ->node->key);
    FILE *fp = NULL;
    fp = fopen(filename, "w+");
    // fp = stdout;
@@ -272,59 +269,68 @@
    int recvsize;
    void *recvbuf;
  for (l = 0; l < SCALE; l++) {
    sprintf(sendbuf, hello_format, targ->id, l);
    sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l);
    // fprintf(fp, "requst:%s\n", sendbuf);
    // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
    n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000);
    n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000);
    printf("%d: send %d nodes\n", l, n);
    for(j=0; j < recv_arr_size; j++) {
      fprintf(fp, "%ld send '%s'. received '%s' from (host:%s, port: %d, key:%d) \n",
        targ->id,
      fprintf(fp, "%d send '%s' to %d. received  from (host=%s, port= %d, key=%d) '%s'\n",
        net_mod_socket_get_key(client),
        sendbuf,
        recv_arr[j].content,
        targ->node->key,
        recv_arr[j].host,
        recv_arr[j].port,
        recv_arr[j].key
        recv_arr[j].key,
        recv_arr[j].content
      );
      assert(sscanf((const char *)recv_arr[j].content, hello_format, &rtid, &rl) == 2);
      assert(rtid == targ->id);
      printf("key == %d\n", net_mod_socket_get_key(client));
      assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3);
      assert(targ->node->key == rkey);
      assert(net_mod_socket_get_key(client) == lkey);
      assert(rl == l);
    }
        // 使用完后,不要忘记释放掉
        net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
    total += n;
  }
  fclose(fp);
  if(fp != NULL)
    fclose(fp);
  // net_mod_socket_close(client);
  return (void *)total;
}
//多线程send
void test_net_sendandrecv_threads(char *nodelist) {
void one_sendto_many(char *nodelist) {
  int status, i = 0, processors = 4;
  void *res[processors];
  int status, i = 0;
  // Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
  Targ targs[processors];
  pthread_t tids[processors];
  char sendbuf[512];
  struct timeval start, end;
  long total = 0;
  
  client = net_mod_socket_open();
  net_mod_socket_bind(client, shm_mm_wrapper_alloc_key());
 
  net_node_t *node_arr;
  int node_arr_size = parse_node_list(nodelist, &node_arr);
  Targ targs[node_arr_size];
  pthread_t tids[node_arr_size];
  void *res[node_arr_size];
  printf("开始测试...\n");  
  gettimeofday(&start, NULL);
  for (i = 0; i < processors; i++) {
    targs[i].nodelist = nodelist;
  for (i = 0; i < node_arr_size; i++) {
    targs[i].node = node_arr + i;
    targs[i].id = i;
    pthread_create(&tids[i], NULL, _run_sendandrecv_, (void *)&targs[i]);
    pthread_create(&tids[i], NULL, _run_one_sendto_many_, (void *)&targs[i]);
  }
  for (i = 0; i < processors; i++) {
  for (i = 0; i < node_arr_size; i++) {
    if (pthread_join(tids[i], &res[i]) != 0) {
      perror("multyThreadClient pthread_join");
    } else {
@@ -355,7 +361,7 @@
  char buf[128];
  pid_t pid, rpid ;
  unsigned int l , rl;
  const char *hello_format = "%ld say Hello %u";
  const char *hello_format = "%ld say Hello %u ";
  pid = getpid();
  l = 0;
@@ -378,9 +384,9 @@
      );
      assert(sscanf((const char *)recv_arr[j].content, hello_format, &rpid, &rl) == 2);
      assert(rpid == pid);
      assert(rl == l);
      // assert(sscanf((const char *)recv_arr[j].content, hello_format, &rpid, &rl) == 2);
      // assert(rpid == pid);
      // assert(rl == l);
    }
    
    // 使用完后,不要忘记释放掉
@@ -532,14 +538,14 @@
    }
    start_net_client(opt.sendlist, opt.publist);
  }
  else if (strcmp("test_net_sendandrecv_threads", opt.fun) == 0) {
  else if (strcmp("one_sendto_many", opt.fun) == 0) {
    if(opt.sendlist == 0) {
      fprintf(stderr, "Missing sendlist .\n");
      usage(argv[0]);
      exit(1);
    }
     
    test_net_sendandrecv_threads(opt.sendlist);
    one_sendto_many(opt.sendlist);
  }
  else if (strcmp("test_net_sendandrecv", opt.fun) == 0) {
    if(opt.sendlist == 0) {