From d90b3c3779d9c6bd2884540621d107e4ac10e930 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期四, 04 八月 2022 09:10:58 +0800 Subject: [PATCH] tcp remote reduce timeout in readrequest --- src/nng_wrap.cpp | 23 ++++- main.cpp | 99 ++++++++++++++++++++++++ src/common.h | 2 src/interface_bus_api.cpp | 25 +++-- CMakeLists.txt | 10 + src/bn_api.cpp | 23 +++-- 6 files changed, 149 insertions(+), 33 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 385b7b6..e3bffd5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,10 @@ add_subdirectory(${PROJECT_SOURCE_DIR}/src) include_directories(${PROJECT_SOURCE_DIR}/src) -add_executable(${Target} main.cpp) -# target_link_libraries(${Target} nng bhome_msg) -target_link_libraries(${Target} bus_nng) +# add_executable(${Target} main.cpp) +# target_link_libraries(${Target} bus_nng) + +file(GLOB cxx_sources "${PROJECT_SOURCE_DIR}/proto/${ARCH}/*.cc") +include_directories(${PROJECT_SOURCE_DIR}/proto/${ARCH}/ ${PROTO_LIB_PATH}/include) +add_executable(${Target} main.cpp ${cxx_sources}) +target_link_libraries(${Target} bus_nng bhome_msg pthread rt) diff --git a/main.cpp b/main.cpp index ecbaa5a..e016ba9 100644 --- a/main.cpp +++ b/main.cpp @@ -93,11 +93,106 @@ }); } + +#include "bhome_msg.pb.h" +#include "bhome_msg_api.pb.h" +using namespace bhome_msg; + int main(int argc, char const *argv[]) { // run_test([&]{test_rr();}); - test_rr(); - test_ps(); + + // test_rr(); + // test_ps(); + // return 0; + + int reply = 1; + + string id("hello-reply"); + if (argc > 1) { + printf("this is request\n"); + id = "hello-request"; + reply = 0; + }else{ + printf("this is reply\n"); + } + + ProcInfo pi; + pi.set_proc_id(id); + pi.set_name("works"); + + string out; + pi.SerializeToString(&out); + + void* rep; + int repl = 0; + BHRegister(out.data(), out.size(), &rep, &repl, 500); + + if (reply){ + while (true) { + void* pid; + int pidl; + void* req; + int reql; + void* src; + + if (BHReadRequest(&pid, &pidl, &req, &reql, &src, 500)){ + + bhome_msg::MsgRequestTopic msg; + msg.ParseFromArray(req, reql); + printf("recv request %d msg data %s\n", reql, msg.data().c_str()); + + bhome_msg::MsgRequestTopicReply rep; + rep.set_data(msg.data() + "-reply"); + string srep; + rep.SerializeToString(&srep); + + // auto s = chrono::steady_clock::now(); + int ret = BHSendReply(src, srep.data(), srep.size()); + // auto e = chrono::steady_clock::now(); + // printf("reply time %lu ms\n", chrono::duration_cast<chrono::milliseconds>(e-s).count()); + + }else{ + // usleep(50000); + // printf("BHReadRequest no data\n"); + } + } + }else{ + printf("start request %d\n", atoi(argv[1])); + bhome_msg::BHAddress addr; + addr.set_ip("192.168.20.108"); + addr.set_port(atoi(argv[1])); + + string node; + addr.SerializeToString(&node); + + void* pid; + int pidl; + void* rep; + int repl; + unsigned idx = 0; + while(1){ + + bhome_msg::MsgRequestTopic msg; + msg.set_topic("hello-reply"); + msg.set_data("hell-world-" + to_string(getpid()) + "-" + to_string(idx++)); + string smsg; + msg.SerializeToString(&smsg); + + // auto s = chrono::steady_clock::now(); + int ret = BHRequest(node.data(), node.size(), smsg.data(), smsg.size(), + &pid, &pidl, &rep, &repl, 5000); + // auto e = chrono::steady_clock::now(); + // printf("request time %lu ms\n", chrono::duration_cast<chrono::milliseconds>(e-s).count()); + + bhome_msg::MsgRequestTopicReply prep; + prep.ParseFromArray(rep, repl); + + printf("pid %d BHRequest rep data %s size %d ret %d\n", getpid(), prep.data().c_str(), + repl, ret); + + } + } return 0; } \ No newline at end of file diff --git a/src/bn_api.cpp b/src/bn_api.cpp index d63e96c..39866a8 100644 --- a/src/bn_api.cpp +++ b/src/bn_api.cpp @@ -42,7 +42,7 @@ unsigned short port = *(unsigned short*)(out); copy_memory(reply, reply_len, (char*)out + port_size, out_len - port_size); BHFree(out, out_len); - // printf("======>> recv port %d\n", port); + printf("======>> recv port %d\n", port); ///////////////////////////////////////////////////////////////////////// const auto& url_pub_proxy = get_url(URLPubProxy); @@ -351,11 +351,7 @@ string url{}; // BHQueryTopicAddress鑾峰彇proc_id string procid{}; - if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){ - // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed"); - // return false; - url = "ipc:///tmp/" + procid; - } + if (remote && remote_len > 0){ BHAddress addr; if (addr.ParseFromArray(remote, remote_len)){ @@ -364,7 +360,12 @@ // printf("======>> BHRequest use remote address %s\n", url.c_str()); } } + } else if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){ + // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed"); + // return false; + url = "ipc:///tmp/" + procid; } + if (url.empty()) { set_last_error("BHRequest url empty"); return false; @@ -387,11 +388,13 @@ { string msg; auto ret = read_request(src, &msg, timeout_ms); - if (ret != 0) return false; + if (ret < 0) return false; - string procid{}; - if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){ - copy_memory(proc_id, proc_id_len, procid.data(), procid.size()); + if (ret == REPLY_IPC){ + string procid{}; + if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){ + copy_memory(proc_id, proc_id_len, procid.data(), procid.size()); + } } copy_memory(request, request_len, msg.data(), msg.size()); diff --git a/src/common.h b/src/common.h index 253d124..819c09a 100644 --- a/src/common.h +++ b/src/common.h @@ -179,6 +179,7 @@ }; enum { INIT, RECV, WAIT, SEND }; +enum { REPLY_IPC, REPLY_TCP }; struct work { int state{-1}; nng_aio *aio{}; @@ -186,6 +187,7 @@ nng_ctx ctx{}; void(*cb_recv)(work*){}; void* user_data{}; + int mode{-1}; }; static const std::string rr_unblocking_msg_{"~!@#$%^&*()-=<<UNBLOCKING>>=-()*&^%$#@!~"}; diff --git a/src/interface_bus_api.cpp b/src/interface_bus_api.cpp index 638f31a..ccb75c8 100644 --- a/src/interface_bus_api.cpp +++ b/src/interface_bus_api.cpp @@ -341,11 +341,7 @@ string url{}; // BHQueryTopicAddress鑾峰彇proc_id string procid{}; - if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){ - // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed"); - // return false; - url = "ipc:///tmp/" + procid; - } + if (remote && remote_len > 0){ BHAddress addr; if (addr.ParseFromArray(remote, remote_len)){ @@ -354,7 +350,12 @@ // printf("======>> BHRequest use remote address %s\n", url.c_str()); } } + } else if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){ + // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed"); + // return false; + url = "ipc:///tmp/" + procid; } + if (url.empty()) { set_last_error("bus_request url empty"); return false; @@ -383,13 +384,15 @@ string msg; auto ret = read_request(src, &msg, timeout_ms, &get<6>(*b)); // printf("bus_recv_request read_request ret %d msg %s\n", ret, msg.c_str()); - if (ret != 0) return false; + if (ret < 0) return false; - string procid{}; - if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){ - // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed"); - // return false; - copy_memory(proc_id, proc_id_len, procid.data(), procid.size()); + if (ret == REPLY_IPC){ + string procid{}; + if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){ + // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed"); + // return false; + copy_memory(proc_id, proc_id_len, procid.data(), procid.size()); + } } copy_memory(request, request_len, msg.data(), msg.size()); diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp index 785ef1d..7b5559e 100644 --- a/src/nng_wrap.cpp +++ b/src/nng_wrap.cpp @@ -423,7 +423,7 @@ static void cb_recv_for_aio(work* w){ nng_msg *om = w->msg; - if (!om) return; + if (!om) {nng_sleep_aio(0, w->aio); return;} _rr* rep = (_rr*)w->user_data; @@ -438,7 +438,7 @@ rep->cv_msg_.notify_all(); } -static struct work *alloc_work(nng_socket sock, _rr* rep) +static struct work *alloc_work(nng_socket sock, _rr* rep, const int mode) { struct work *w; int rv; @@ -458,10 +458,12 @@ return NULL; } w->state = INIT; + w->mode = mode; + return (w); } -static int create_server(nng_socket* sock, const string& url, const int count, _rr* rep){ +static int create_server(nng_socket* sock, const string& url, const int count, _rr* rep, const int mode){ TAG; if (sock->id > 0) return 0; @@ -474,7 +476,7 @@ work** works = (work**)malloc(sizeof(void*) * count); for (int i = 0; i < count; i++) { - works[i] = alloc_work(*sock, rep); + works[i] = alloc_work(*sock, rep, mode); } remove_exist(url); @@ -511,12 +513,12 @@ ipc = url; } rep->url_ = ipc; - if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1; + if(create_server(&get<0>(rep->socks_), ipc, 62, rep, REPLY_IPC) != 0) return -1; if (port > 0){ get<1>(get<1>(rep->socks_)) = port; ipc = "tcp://0.0.0.0:" + to_string(port); - if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1; + if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep, REPLY_TCP) != 0) return -1; // printf("======>> create server for remote port %d\n", port); }else { get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max(); @@ -566,6 +568,7 @@ int tm = to_ms > 0 ? to_ms : 30; uint64_t key{}; + work* w{}; { unique_lock<mutex> l(rep->mtx_msg_); auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{ @@ -579,12 +582,18 @@ key = iter->first; *msg = move(iter->second); rep->msg_.erase(iter); + auto witer = rep->works_.find(key); + if (witer != rep->works_.end()){ + w = witer->second; + } } + + if (!w) return -1; *src = malloc(sizeof(uint64_t)); *(uint64_t*)(*src) = key; - return 0; + return w->mode; } int send_reply(const void* src, const void* msg, const int msg_len, void* arg/*=NULL*/){ -- Gitblit v1.8.0