#include #include #include #include #include #include #include #include using namespace std; #include "src/bn_api.h" static void test_rr(){ thread([]{ string base_cont("test_req_rep=="); atomic index{0}; vector v_t; for (int i = 0; i < 621; i++){ v_t.emplace_back([&base_cont, i, &index]{ while (true) { // printf("start request\n"); // auto s = chrono::steady_clock::now(); auto msg("[Thread("+to_string(i)+")]->"+base_cont+to_string(index++)); TestRequest(0, msg.c_str(), msg.length()); this_thread::sleep_for(chrono::milliseconds(10)); // auto e = chrono::steady_clock::now(); // printf("======>>thread %d TestRequest time %ld ms\n", i, chrono::duration_cast(e-s).count()); } }); } while (true) { // printf("start request\n"); // auto s = chrono::steady_clock::now(); auto msg(base_cont+to_string(index++)); TestRequest(0, msg.c_str(), msg.length()); this_thread::sleep_for(chrono::milliseconds(10)); // auto e = chrono::steady_clock::now(); // printf("TestRequest time %ld ms\n", chrono::duration_cast(e-s).count()); } }).detach(); while(true){ TestReply(0, -1); } } static void test_ps(){ const string t("topics_"); vector topics; for(int i = 0; i < 3; i++){ topics.emplace_back(t + to_string(i+1)); } string base_cont("test_pub_sub=="); // while (base_cont.size() < 12662) { // base_cont += base_cont; // } thread([&]{ this_thread::sleep_for(chrono::seconds(3)); while (true) { for(auto && i : topics){ auto msg = base_cont + "test_ps pub message "+i+"-->msg"; TestPub(i.c_str(), i.length(), msg.c_str(), msg.length()); this_thread::sleep_for(chrono::milliseconds{126}); } } }).detach(); for(auto && i : topics){ TestSub(i.c_str(), i.length(), 0, 0); } // this_thread::sleep_for(chrono::seconds(3)); while (true) { char *msg; int msg_len; TestSub(NULL, 0, (void**)&msg, &msg_len); this_thread::sleep_for(chrono::seconds{1}); } } vector v_t; template void run_test(F&& f){ v_t.emplace_back([f]{ f(); }); } #include "bhome_msg.pb.h" #include "bhome_msg_api.pb.h" using namespace bhome_msg; int main(int argc, char const *argv[]) { // run_test([&]{test_rr();}); // test_rr(); // test_ps(); // return 0; int reply = 1; string id("hello-reply"); if (argc > 1) { printf("this is request\n"); id = "hello-request"; reply = 0; }else{ printf("this is reply\n"); } ProcInfo pi; pi.set_proc_id(id); pi.set_name("works"); string out; pi.SerializeToString(&out); void* rep; int repl = 0; BHRegister(out.data(), out.size(), &rep, &repl, 500); if (reply){ while (true) { void* pid; int pidl; void* req; int reql; void* src; if (BHReadRequest(&pid, &pidl, &req, &reql, &src, 500)){ bhome_msg::MsgRequestTopic msg; msg.ParseFromArray(req, reql); printf("recv request %d msg data %s\n", reql, msg.data().c_str()); bhome_msg::MsgRequestTopicReply rep; rep.set_data(msg.data() + "-reply"); string srep; rep.SerializeToString(&srep); // auto s = chrono::steady_clock::now(); int ret = BHSendReply(src, srep.data(), srep.size()); // auto e = chrono::steady_clock::now(); // printf("reply time %lu ms\n", chrono::duration_cast(e-s).count()); }else{ // usleep(50000); // printf("BHReadRequest no data\n"); } } }else{ printf("start request %d\n", atoi(argv[1])); bhome_msg::BHAddress addr; addr.set_ip("192.168.20.108"); addr.set_port(atoi(argv[1])); string node; addr.SerializeToString(&node); void* pid; int pidl; void* rep; int repl; unsigned idx = 0; while(1){ bhome_msg::MsgRequestTopic msg; msg.set_topic("hello-reply"); msg.set_data("hell-world-" + to_string(getpid()) + "-" + to_string(idx++)); string smsg; msg.SerializeToString(&smsg); // auto s = chrono::steady_clock::now(); int ret = BHRequest(node.data(), node.size(), smsg.data(), smsg.size(), &pid, &pidl, &rep, &repl, 5000); // auto e = chrono::steady_clock::now(); // printf("request time %lu ms\n", chrono::duration_cast(e-s).count()); bhome_msg::MsgRequestTopicReply prep; prep.ParseFromArray(rep, repl); printf("pid %d BHRequest rep data %s size %d ret %d\n", getpid(), prep.data().c_str(), repl, ret); } } return 0; }