wangzhengquan
2021-01-29 b9b8088b1f5e7ca29d108f1c87b75855d6735d1e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#include <assert.h>
#include "net_mod_server_socket_wrapper.h"
#include "net_mod_socket_wrapper.h"
#include "bus_server_socket_wrapper.h"
 
#include "shm_mm_wrapper.h"
#include "usg_common.h"
#include <getopt.h>
#include "logger_factory.h"
 
void *_run_sendandrecv_(void *arg) {
  Targ *targ = (Targ *)arg;
  char sendbuf[128];
 
  int j, n;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  int total = 0;
 
  
  net_node_t *node_arr;
  int node_arr_size = parse_node_list(targ->nodelist, &node_arr);
 
  long rtid;
  unsigned int l = 0 , rl;
  const char *hello_format = "%ld say Hello %d";
 
 
    char filename[512];
    sprintf(filename, "test%d.tmp", targ->id);
    FILE *fp = NULL;
    fp = fopen(filename, "w+");
    // fp = stdout;
 
    int recvsize;
    void *recvbuf;
  for (l = 0; l < SCALE; l++) {
    sprintf(sendbuf, hello_format, targ->id, l);
    // fprintf(fp, "requst:%s\n", sendbuf);
    // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
    n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000);
    printf("%d: send %d nodes\n", l, n);
    for(j=0; j < recv_arr_size; j++) {
 
      fprintf(fp, "%ld send '%s'. received '%s' from (host:%s, port: %d, key:%d) \n",
        targ->id,
        sendbuf,
        recv_arr[j].content,
        recv_arr[j].host,
        recv_arr[j].port,
        recv_arr[j].key
 
      );
 
      assert(sscanf((const char *)recv_arr[j].content, hello_format, &rtid, &rl) == 2);
      assert(rtid == targ->id);
      assert(rl == l);
    }
        // 使用完后,不要忘记释放掉
        net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
    total += n;
  }
  // fclose(fp);
  // net_mod_socket_close(client);
  return (void *)total;
}
 
//多线程send
void test_net_sendandrecv_threads(char *nodelist) {
 
  int status, i = 0, processors = 4;
  void *res[processors];
  // Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
  Targ targs[processors];
  pthread_t tids[processors];
  char sendbuf[512];
  struct timeval start, end;
  long total = 0;
  
  client = net_mod_socket_open();
 
  printf("开始测试...\n");  
  gettimeofday(&start, NULL);
  for (i = 0; i < processors; i++) {
    targs[i].nodelist = nodelist;
    targs[i].id = i;
    pthread_create(&tids[i], NULL, _run_sendandrecv_, (void *)&targs[i]);
  }
 
  for (i = 0; i < processors; i++) {
    if (pthread_join(tids[i], &res[i]) != 0) {
      perror("multyThreadClient pthread_join");
    } else {
        total += (long)res[i];
      //fprintf(stderr, "client(%d) 写入 %ld 条数据\n", i, (long)res[i]);
    }
  }
 
  gettimeofday(&end, NULL);
 
  double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
  long diffsec = (long) (difftime/1000000);
  long diffusec = difftime - diffsec*1000000;
  fprintf(stderr,"发送数目: %ld, 用时: (%ld sec %ld usec), 平均: %f\n", total, diffsec, diffusec, difftime/total );
  // fflush(stdout);
 
}