#include <stdio.h>
|
#include <stdlib.h>
|
#include <string.h>
|
#include <unistd.h>
|
|
#include <vector>
|
#include <thread>
|
#include <chrono>
|
#include <atomic>
|
using namespace std;
|
|
#include "src/bn_api.h"
|
|
static void test_rr(){
|
|
thread([]{
|
string base_cont("test_req_rep==");
|
|
atomic<uint64_t> index{0};
|
vector<thread> v_t;
|
for (int i = 0; i < 621; i++){
|
v_t.emplace_back([&base_cont, i, &index]{
|
|
while (true) {
|
// printf("start request\n");
|
// auto s = chrono::steady_clock::now();
|
auto msg("[Thread("+to_string(i)+")]->"+base_cont+to_string(index++));
|
TestRequest(0, msg.c_str(), msg.length());
|
this_thread::sleep_for(chrono::milliseconds(10));
|
// auto e = chrono::steady_clock::now();
|
// printf("======>>thread %d TestRequest time %ld ms\n", i, chrono::duration_cast<chrono::milliseconds>(e-s).count());
|
}
|
});
|
}
|
|
while (true) {
|
// printf("start request\n");
|
// auto s = chrono::steady_clock::now();
|
auto msg(base_cont+to_string(index++));
|
TestRequest(0, msg.c_str(), msg.length());
|
this_thread::sleep_for(chrono::milliseconds(10));
|
// auto e = chrono::steady_clock::now();
|
// printf("TestRequest time %ld ms\n", chrono::duration_cast<chrono::milliseconds>(e-s).count());
|
}
|
}).detach();
|
|
while(true){
|
TestReply(0, -1);
|
}
|
|
}
|
|
static void test_ps(){
|
|
const string t("topics_");
|
vector<string> topics;
|
for(int i = 0; i < 3; i++){
|
topics.emplace_back(t + to_string(i+1));
|
}
|
|
string base_cont("test_pub_sub==");
|
// while (base_cont.size() < 12662) {
|
// base_cont += base_cont;
|
// }
|
thread([&]{
|
this_thread::sleep_for(chrono::seconds(3));
|
while (true) {
|
for(auto && i : topics){
|
auto msg = base_cont + "test_ps pub message "+i+"-->msg";
|
TestPub(i.c_str(), i.length(), msg.c_str(), msg.length());
|
this_thread::sleep_for(chrono::milliseconds{126});
|
}
|
}
|
}).detach();
|
|
for(auto && i : topics){
|
TestSub(i.c_str(), i.length(), 0, 0);
|
}
|
// this_thread::sleep_for(chrono::seconds(3));
|
while (true) {
|
char *msg;
|
int msg_len;
|
TestSub(NULL, 0, (void**)&msg, &msg_len);
|
this_thread::sleep_for(chrono::seconds{1});
|
}
|
}
|
|
vector<thread> v_t;
|
template<class F>
|
void run_test(F&& f){
|
v_t.emplace_back([f]{
|
f();
|
});
|
}
|
|
|
#include "bhome_msg.pb.h"
|
#include "bhome_msg_api.pb.h"
|
using namespace bhome_msg;
|
|
int main(int argc, char const *argv[])
|
{
|
// run_test([&]{test_rr();});
|
|
// test_rr();
|
// test_ps();
|
// return 0;
|
|
int reply = 1;
|
|
string id("hello-reply");
|
if (argc > 1) {
|
printf("this is request\n");
|
id = "hello-request";
|
reply = 0;
|
}else{
|
printf("this is reply\n");
|
}
|
|
ProcInfo pi;
|
pi.set_proc_id(id);
|
pi.set_name("works");
|
|
string out;
|
pi.SerializeToString(&out);
|
|
void* rep;
|
int repl = 0;
|
BHRegister(out.data(), out.size(), &rep, &repl, 500);
|
|
if (reply){
|
while (true) {
|
void* pid;
|
int pidl;
|
void* req;
|
int reql;
|
void* src;
|
|
if (BHReadRequest(&pid, &pidl, &req, &reql, &src, 500)){
|
|
bhome_msg::MsgRequestTopic msg;
|
msg.ParseFromArray(req, reql);
|
printf("recv request %d msg data %s\n", reql, msg.data().c_str());
|
|
bhome_msg::MsgRequestTopicReply rep;
|
rep.set_data(msg.data() + "-reply");
|
string srep;
|
rep.SerializeToString(&srep);
|
|
// auto s = chrono::steady_clock::now();
|
int ret = BHSendReply(src, srep.data(), srep.size());
|
// auto e = chrono::steady_clock::now();
|
// printf("reply time %lu ms\n", chrono::duration_cast<chrono::milliseconds>(e-s).count());
|
|
}else{
|
// usleep(50000);
|
// printf("BHReadRequest no data\n");
|
}
|
}
|
}else{
|
printf("start request %d\n", atoi(argv[1]));
|
bhome_msg::BHAddress addr;
|
addr.set_ip("192.168.20.108");
|
addr.set_port(atoi(argv[1]));
|
|
string node;
|
addr.SerializeToString(&node);
|
|
void* pid;
|
int pidl;
|
void* rep;
|
int repl;
|
unsigned idx = 0;
|
while(1){
|
|
bhome_msg::MsgRequestTopic msg;
|
msg.set_topic("hello-reply");
|
msg.set_data("hell-world-" + to_string(getpid()) + "-" + to_string(idx++));
|
string smsg;
|
msg.SerializeToString(&smsg);
|
|
// auto s = chrono::steady_clock::now();
|
int ret = BHRequest(node.data(), node.size(), smsg.data(), smsg.size(),
|
&pid, &pidl, &rep, &repl, 5000);
|
// auto e = chrono::steady_clock::now();
|
// printf("request time %lu ms\n", chrono::duration_cast<chrono::milliseconds>(e-s).count());
|
|
bhome_msg::MsgRequestTopicReply prep;
|
prep.ParseFromArray(rep, repl);
|
|
printf("pid %d BHRequest rep data %s size %d ret %d\n", getpid(), prep.data().c_str(),
|
repl, ret);
|
|
}
|
}
|
|
return 0;
|
}
|