From d90b3c3779d9c6bd2884540621d107e4ac10e930 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期四, 04 八月 2022 09:10:58 +0800
Subject: [PATCH] tcp remote reduce timeout in readrequest
---
src/nng_wrap.cpp | 23 ++++-
main.cpp | 99 ++++++++++++++++++++++++
src/common.h | 2
src/interface_bus_api.cpp | 25 +++--
CMakeLists.txt | 10 +
src/bn_api.cpp | 23 +++--
6 files changed, 149 insertions(+), 33 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 385b7b6..e3bffd5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -34,6 +34,10 @@
add_subdirectory(${PROJECT_SOURCE_DIR}/src)
include_directories(${PROJECT_SOURCE_DIR}/src)
-add_executable(${Target} main.cpp)
-# target_link_libraries(${Target} nng bhome_msg)
-target_link_libraries(${Target} bus_nng)
+# add_executable(${Target} main.cpp)
+# target_link_libraries(${Target} bus_nng)
+
+file(GLOB cxx_sources "${PROJECT_SOURCE_DIR}/proto/${ARCH}/*.cc")
+include_directories(${PROJECT_SOURCE_DIR}/proto/${ARCH}/ ${PROTO_LIB_PATH}/include)
+add_executable(${Target} main.cpp ${cxx_sources})
+target_link_libraries(${Target} bus_nng bhome_msg pthread rt)
diff --git a/main.cpp b/main.cpp
index ecbaa5a..e016ba9 100644
--- a/main.cpp
+++ b/main.cpp
@@ -93,11 +93,106 @@
});
}
+
+#include "bhome_msg.pb.h"
+#include "bhome_msg_api.pb.h"
+using namespace bhome_msg;
+
int main(int argc, char const *argv[])
{
// run_test([&]{test_rr();});
- test_rr();
- test_ps();
+
+ // test_rr();
+ // test_ps();
+ // return 0;
+
+ int reply = 1;
+
+ string id("hello-reply");
+ if (argc > 1) {
+ printf("this is request\n");
+ id = "hello-request";
+ reply = 0;
+ }else{
+ printf("this is reply\n");
+ }
+
+ ProcInfo pi;
+ pi.set_proc_id(id);
+ pi.set_name("works");
+
+ string out;
+ pi.SerializeToString(&out);
+
+ void* rep;
+ int repl = 0;
+ BHRegister(out.data(), out.size(), &rep, &repl, 500);
+
+ if (reply){
+ while (true) {
+ void* pid;
+ int pidl;
+ void* req;
+ int reql;
+ void* src;
+
+ if (BHReadRequest(&pid, &pidl, &req, &reql, &src, 500)){
+
+ bhome_msg::MsgRequestTopic msg;
+ msg.ParseFromArray(req, reql);
+ printf("recv request %d msg data %s\n", reql, msg.data().c_str());
+
+ bhome_msg::MsgRequestTopicReply rep;
+ rep.set_data(msg.data() + "-reply");
+ string srep;
+ rep.SerializeToString(&srep);
+
+ // auto s = chrono::steady_clock::now();
+ int ret = BHSendReply(src, srep.data(), srep.size());
+ // auto e = chrono::steady_clock::now();
+ // printf("reply time %lu ms\n", chrono::duration_cast<chrono::milliseconds>(e-s).count());
+
+ }else{
+ // usleep(50000);
+ // printf("BHReadRequest no data\n");
+ }
+ }
+ }else{
+ printf("start request %d\n", atoi(argv[1]));
+ bhome_msg::BHAddress addr;
+ addr.set_ip("192.168.20.108");
+ addr.set_port(atoi(argv[1]));
+
+ string node;
+ addr.SerializeToString(&node);
+
+ void* pid;
+ int pidl;
+ void* rep;
+ int repl;
+ unsigned idx = 0;
+ while(1){
+
+ bhome_msg::MsgRequestTopic msg;
+ msg.set_topic("hello-reply");
+ msg.set_data("hell-world-" + to_string(getpid()) + "-" + to_string(idx++));
+ string smsg;
+ msg.SerializeToString(&smsg);
+
+ // auto s = chrono::steady_clock::now();
+ int ret = BHRequest(node.data(), node.size(), smsg.data(), smsg.size(),
+ &pid, &pidl, &rep, &repl, 5000);
+ // auto e = chrono::steady_clock::now();
+ // printf("request time %lu ms\n", chrono::duration_cast<chrono::milliseconds>(e-s).count());
+
+ bhome_msg::MsgRequestTopicReply prep;
+ prep.ParseFromArray(rep, repl);
+
+ printf("pid %d BHRequest rep data %s size %d ret %d\n", getpid(), prep.data().c_str(),
+ repl, ret);
+
+ }
+ }
return 0;
}
\ No newline at end of file
diff --git a/src/bn_api.cpp b/src/bn_api.cpp
index d63e96c..39866a8 100644
--- a/src/bn_api.cpp
+++ b/src/bn_api.cpp
@@ -42,7 +42,7 @@
unsigned short port = *(unsigned short*)(out);
copy_memory(reply, reply_len, (char*)out + port_size, out_len - port_size);
BHFree(out, out_len);
- // printf("======>> recv port %d\n", port);
+ printf("======>> recv port %d\n", port);
/////////////////////////////////////////////////////////////////////////
const auto& url_pub_proxy = get_url(URLPubProxy);
@@ -351,11 +351,7 @@
string url{};
// BHQueryTopicAddress鑾峰彇proc_id
string procid{};
- if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
- // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
- // return false;
- url = "ipc:///tmp/" + procid;
- }
+
if (remote && remote_len > 0){
BHAddress addr;
if (addr.ParseFromArray(remote, remote_len)){
@@ -364,7 +360,12 @@
// printf("======>> BHRequest use remote address %s\n", url.c_str());
}
}
+ } else if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
+ // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
+ // return false;
+ url = "ipc:///tmp/" + procid;
}
+
if (url.empty()) {
set_last_error("BHRequest url empty");
return false;
@@ -387,11 +388,13 @@
{
string msg;
auto ret = read_request(src, &msg, timeout_ms);
- if (ret != 0) return false;
+ if (ret < 0) return false;
- string procid{};
- if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
- copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
+ if (ret == REPLY_IPC){
+ string procid{};
+ if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
+ copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
+ }
}
copy_memory(request, request_len, msg.data(), msg.size());
diff --git a/src/common.h b/src/common.h
index 253d124..819c09a 100644
--- a/src/common.h
+++ b/src/common.h
@@ -179,6 +179,7 @@
};
enum { INIT, RECV, WAIT, SEND };
+enum { REPLY_IPC, REPLY_TCP };
struct work {
int state{-1};
nng_aio *aio{};
@@ -186,6 +187,7 @@
nng_ctx ctx{};
void(*cb_recv)(work*){};
void* user_data{};
+ int mode{-1};
};
static const std::string rr_unblocking_msg_{"~!@#$%^&*()-=<<UNBLOCKING>>=-()*&^%$#@!~"};
diff --git a/src/interface_bus_api.cpp b/src/interface_bus_api.cpp
index 638f31a..ccb75c8 100644
--- a/src/interface_bus_api.cpp
+++ b/src/interface_bus_api.cpp
@@ -341,11 +341,7 @@
string url{};
// BHQueryTopicAddress鑾峰彇proc_id
string procid{};
- if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
- // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
- // return false;
- url = "ipc:///tmp/" + procid;
- }
+
if (remote && remote_len > 0){
BHAddress addr;
if (addr.ParseFromArray(remote, remote_len)){
@@ -354,7 +350,12 @@
// printf("======>> BHRequest use remote address %s\n", url.c_str());
}
}
+ } else if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
+ // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
+ // return false;
+ url = "ipc:///tmp/" + procid;
}
+
if (url.empty()) {
set_last_error("bus_request url empty");
return false;
@@ -383,13 +384,15 @@
string msg;
auto ret = read_request(src, &msg, timeout_ms, &get<6>(*b));
// printf("bus_recv_request read_request ret %d msg %s\n", ret, msg.c_str());
- if (ret != 0) return false;
+ if (ret < 0) return false;
- string procid{};
- if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
- // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
- // return false;
- copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
+ if (ret == REPLY_IPC){
+ string procid{};
+ if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
+ // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
+ // return false;
+ copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
+ }
}
copy_memory(request, request_len, msg.data(), msg.size());
diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp
index 785ef1d..7b5559e 100644
--- a/src/nng_wrap.cpp
+++ b/src/nng_wrap.cpp
@@ -423,7 +423,7 @@
static void cb_recv_for_aio(work* w){
nng_msg *om = w->msg;
- if (!om) return;
+ if (!om) {nng_sleep_aio(0, w->aio); return;}
_rr* rep = (_rr*)w->user_data;
@@ -438,7 +438,7 @@
rep->cv_msg_.notify_all();
}
-static struct work *alloc_work(nng_socket sock, _rr* rep)
+static struct work *alloc_work(nng_socket sock, _rr* rep, const int mode)
{
struct work *w;
int rv;
@@ -458,10 +458,12 @@
return NULL;
}
w->state = INIT;
+ w->mode = mode;
+
return (w);
}
-static int create_server(nng_socket* sock, const string& url, const int count, _rr* rep){
+static int create_server(nng_socket* sock, const string& url, const int count, _rr* rep, const int mode){
TAG;
if (sock->id > 0) return 0;
@@ -474,7 +476,7 @@
work** works = (work**)malloc(sizeof(void*) * count);
for (int i = 0; i < count; i++) {
- works[i] = alloc_work(*sock, rep);
+ works[i] = alloc_work(*sock, rep, mode);
}
remove_exist(url);
@@ -511,12 +513,12 @@
ipc = url;
}
rep->url_ = ipc;
- if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1;
+ if(create_server(&get<0>(rep->socks_), ipc, 62, rep, REPLY_IPC) != 0) return -1;
if (port > 0){
get<1>(get<1>(rep->socks_)) = port;
ipc = "tcp://0.0.0.0:" + to_string(port);
- if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
+ if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep, REPLY_TCP) != 0) return -1;
// printf("======>> create server for remote port %d\n", port);
}else {
get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
@@ -566,6 +568,7 @@
int tm = to_ms > 0 ? to_ms : 30;
uint64_t key{};
+ work* w{};
{
unique_lock<mutex> l(rep->mtx_msg_);
auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{
@@ -579,12 +582,18 @@
key = iter->first;
*msg = move(iter->second);
rep->msg_.erase(iter);
+ auto witer = rep->works_.find(key);
+ if (witer != rep->works_.end()){
+ w = witer->second;
+ }
}
+
+ if (!w) return -1;
*src = malloc(sizeof(uint64_t));
*(uint64_t*)(*src) = key;
- return 0;
+ return w->mode;
}
int send_reply(const void* src, const void* msg, const int msg_len, void* arg/*=NULL*/){
--
Gitblit v1.8.0