zhangmeng
2021-12-10 5990bac28af438a914165441b3c33a370b320d16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
 
#include <vector>
#include <thread>
#include <chrono>
using namespace std;
 
#include "src/bn_api.h"
 
static void test_rr(){
 
  thread([]{
    string base_cont("test_req_rep==");
 
    vector<thread> v_t;
    for (int i = 0; i < 200; i++){
      v_t.emplace_back([&base_cont, i]{
        int64_t index = 0;
        while (true) {
          // printf("start request\n");
          // auto s = chrono::steady_clock::now();
          auto msg("[Thread("+to_string(i)+")]->"+base_cont+to_string(index++));
          TestRequest(0, msg.c_str(), msg.length());
          this_thread::sleep_for(chrono::milliseconds(10));
          // auto e = chrono::steady_clock::now();
          // printf("======>>thread %d TestRequest time %ld ms\n", i, chrono::duration_cast<chrono::milliseconds>(e-s).count());
        }
      });
    }
 
    int64_t index = 0;
    while (true) {
      // printf("start request\n");
      // auto s = chrono::steady_clock::now();
      auto msg(base_cont+to_string(index++));
      TestRequest(0, msg.c_str(), msg.length());
      this_thread::sleep_for(chrono::milliseconds(10));
      // auto e = chrono::steady_clock::now();
      // printf("TestRequest time %ld ms\n", chrono::duration_cast<chrono::milliseconds>(e-s).count());
    }
  }).detach();
 
    while(true){
      TestReply(0, -1);
    }
 
}
 
static void test_ps(){
 
  const string t("topics_");
  vector<string> topics;
  for(int i = 0; i < 3; i++){
    topics.emplace_back(t + to_string(i+1));
  }
 
  string base_cont("test_pub_sub==");
  // while (base_cont.size() < 12662) {
  //   base_cont += base_cont;
  // }
  thread([&]{
    this_thread::sleep_for(chrono::seconds(3));
    while (true) {
      for(auto && i : topics){
        auto msg = base_cont + "test_ps pub message "+i+"-->msg";
        TestPub(i.c_str(), i.length(), msg.c_str(), msg.length());
        this_thread::sleep_for(chrono::milliseconds{126});
      }
    }
  }).detach();
 
  for(auto && i : topics){
    TestSub(i.c_str(), i.length(), 0, 0);
  }
  // this_thread::sleep_for(chrono::seconds(3));
  while (true) {
    char *msg;
    int msg_len;
    TestSub(NULL, 0, (void**)&msg, &msg_len);
    this_thread::sleep_for(chrono::seconds{1});
  }
}
 
vector<thread> v_t;
template<class F>
void run_test(F&& f){
  v_t.emplace_back([f]{
    f();
  });
}
 
int main(int argc, char const *argv[])
{
  // run_test([&]{test_rr();});
  test_rr();
  test_ps();
 
  return 0;
}