From d90b3c3779d9c6bd2884540621d107e4ac10e930 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期四, 04 八月 2022 09:10:58 +0800
Subject: [PATCH] tcp remote reduce timeout in readrequest

---
 src/nng_wrap.cpp          |   23 ++++-
 main.cpp                  |   99 ++++++++++++++++++++++++
 src/common.h              |    2 
 src/interface_bus_api.cpp |   25 +++--
 CMakeLists.txt            |   10 +
 src/bn_api.cpp            |   23 +++--
 6 files changed, 149 insertions(+), 33 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 385b7b6..e3bffd5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -34,6 +34,10 @@
 add_subdirectory(${PROJECT_SOURCE_DIR}/src)
 include_directories(${PROJECT_SOURCE_DIR}/src)
 
-add_executable(${Target} main.cpp)
-# target_link_libraries(${Target} nng bhome_msg)
-target_link_libraries(${Target} bus_nng)
+# add_executable(${Target} main.cpp)
+# target_link_libraries(${Target} bus_nng)
+
+file(GLOB cxx_sources "${PROJECT_SOURCE_DIR}/proto/${ARCH}/*.cc")
+include_directories(${PROJECT_SOURCE_DIR}/proto/${ARCH}/ ${PROTO_LIB_PATH}/include)
+add_executable(${Target} main.cpp ${cxx_sources})
+target_link_libraries(${Target} bus_nng bhome_msg pthread rt)
diff --git a/main.cpp b/main.cpp
index ecbaa5a..e016ba9 100644
--- a/main.cpp
+++ b/main.cpp
@@ -93,11 +93,106 @@
   });
 }
 
+
+#include "bhome_msg.pb.h"
+#include "bhome_msg_api.pb.h"
+using namespace bhome_msg;
+
 int main(int argc, char const *argv[])
 {
   // run_test([&]{test_rr();});
-  test_rr();
-  test_ps();
+
+  // test_rr();
+  // test_ps();
+  // return 0;
+
+  int reply = 1;
+
+  string id("hello-reply");
+  if (argc > 1) {
+    printf("this is request\n");
+    id = "hello-request";
+    reply = 0;
+  }else{
+    printf("this is reply\n");
+  }
+
+  ProcInfo pi;
+  pi.set_proc_id(id);
+  pi.set_name("works");
+
+  string out;
+  pi.SerializeToString(&out);
+
+  void* rep;
+  int repl = 0;
+  BHRegister(out.data(), out.size(), &rep, &repl, 500);
+
+  if (reply){
+    while (true) {
+      void* pid;
+      int pidl;
+      void* req;
+      int reql;
+      void* src;
+
+      if (BHReadRequest(&pid, &pidl, &req, &reql, &src, 500)){
+
+        bhome_msg::MsgRequestTopic msg;
+        msg.ParseFromArray(req, reql);
+        printf("recv request %d msg data %s\n", reql, msg.data().c_str());
+
+        bhome_msg::MsgRequestTopicReply rep;
+        rep.set_data(msg.data() + "-reply");
+        string srep;
+        rep.SerializeToString(&srep);
+
+        // auto s = chrono::steady_clock::now();
+        int ret = BHSendReply(src, srep.data(), srep.size());
+        // auto e = chrono::steady_clock::now();
+        // printf("reply time %lu ms\n", chrono::duration_cast<chrono::milliseconds>(e-s).count());
+
+      }else{
+        // usleep(50000);
+        // printf("BHReadRequest no data\n");
+      }
+    }
+  }else{
+    printf("start request %d\n", atoi(argv[1]));
+    bhome_msg::BHAddress addr;
+    addr.set_ip("192.168.20.108");
+    addr.set_port(atoi(argv[1]));
+
+    string node;
+    addr.SerializeToString(&node);
+
+    void* pid;
+    int pidl;
+    void* rep;
+    int repl;
+    unsigned idx = 0;
+    while(1){
+
+      bhome_msg::MsgRequestTopic msg;
+      msg.set_topic("hello-reply");
+      msg.set_data("hell-world-" + to_string(getpid()) + "-" + to_string(idx++));
+      string smsg;
+      msg.SerializeToString(&smsg);
+
+      // auto s = chrono::steady_clock::now();
+      int ret = BHRequest(node.data(), node.size(), smsg.data(), smsg.size(),
+        &pid, &pidl, &rep, &repl, 5000);
+      // auto e = chrono::steady_clock::now();
+      // printf("request time %lu ms\n", chrono::duration_cast<chrono::milliseconds>(e-s).count());
+
+      bhome_msg::MsgRequestTopicReply prep;
+      prep.ParseFromArray(rep, repl);
+
+      printf("pid %d BHRequest rep data %s size %d ret %d\n", getpid(), prep.data().c_str(),
+        repl, ret);
+
+    }
+  }
 
   return 0;
 }
\ No newline at end of file
diff --git a/src/bn_api.cpp b/src/bn_api.cpp
index d63e96c..39866a8 100644
--- a/src/bn_api.cpp
+++ b/src/bn_api.cpp
@@ -42,7 +42,7 @@
         unsigned short port = *(unsigned short*)(out);
         copy_memory(reply, reply_len, (char*)out + port_size, out_len - port_size);
         BHFree(out, out_len);
-        // printf("======>> recv port %d\n", port);
+        printf("======>> recv port %d\n", port);
 /////////////////////////////////////////////////////////////////////////
 
         const auto& url_pub_proxy = get_url(URLPubProxy);
@@ -351,11 +351,7 @@
     string url{};
     // BHQueryTopicAddress鑾峰彇proc_id
     string procid{};
-    if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
-        // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
-        // return false;
-        url = "ipc:///tmp/" + procid;
-    }
+
     if (remote && remote_len > 0){
         BHAddress addr;
         if (addr.ParseFromArray(remote, remote_len)){
@@ -364,7 +360,12 @@
                 // printf("======>> BHRequest use remote address %s\n", url.c_str());
             }
         }
+    } else if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
+        // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
+        // return false;
+        url = "ipc:///tmp/" + procid;
     }
+
     if (url.empty()) {
         set_last_error("BHRequest url empty");
         return false;
@@ -387,11 +388,13 @@
 {
     string msg;
     auto ret = read_request(src, &msg, timeout_ms);
-    if (ret != 0) return false;
+    if (ret < 0) return false;
 
-    string procid{};
-    if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
-        copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
+    if (ret == REPLY_IPC){
+        string procid{};
+        if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
+            copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
+        }
     }
 
     copy_memory(request, request_len, msg.data(), msg.size());
diff --git a/src/common.h b/src/common.h
index 253d124..819c09a 100644
--- a/src/common.h
+++ b/src/common.h
@@ -179,6 +179,7 @@
 };
 
 enum { INIT, RECV, WAIT, SEND };
+enum { REPLY_IPC, REPLY_TCP };
 struct work {
     int state{-1};
     nng_aio *aio{};
@@ -186,6 +187,7 @@
     nng_ctx  ctx{};
     void(*cb_recv)(work*){};
     void* user_data{};
+    int mode{-1};
 };
 
 static const std::string rr_unblocking_msg_{"~!@#$%^&*()-=<<UNBLOCKING>>=-()*&^%$#@!~"};
diff --git a/src/interface_bus_api.cpp b/src/interface_bus_api.cpp
index 638f31a..ccb75c8 100644
--- a/src/interface_bus_api.cpp
+++ b/src/interface_bus_api.cpp
@@ -341,11 +341,7 @@
     string url{};
     // BHQueryTopicAddress鑾峰彇proc_id
     string procid{};
-    if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
-        // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
-        // return false;
-        url = "ipc:///tmp/" + procid;
-    }
+
     if (remote && remote_len > 0){
         BHAddress addr;
         if (addr.ParseFromArray(remote, remote_len)){
@@ -354,7 +350,12 @@
                 // printf("======>> BHRequest use remote address %s\n", url.c_str());
             }
         }
+    } else if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
+        // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
+        // return false;
+        url = "ipc:///tmp/" + procid;
     }
+
     if (url.empty()) {
         set_last_error("bus_request url empty");
         return false;
@@ -383,13 +384,15 @@
     string msg;
     auto ret = read_request(src, &msg, timeout_ms, &get<6>(*b));
     // printf("bus_recv_request read_request ret %d msg %s\n", ret, msg.c_str());
-    if (ret != 0) return false;
+    if (ret < 0) return false;
 
-    string procid{};
-    if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
-        // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
-        // return false;
-        copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
+    if (ret == REPLY_IPC){
+        string procid{};
+        if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
+            // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
+            // return false;
+            copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
+        }
     }
 
     copy_memory(request, request_len, msg.data(), msg.size());
diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp
index 785ef1d..7b5559e 100644
--- a/src/nng_wrap.cpp
+++ b/src/nng_wrap.cpp
@@ -423,7 +423,7 @@
 
 static void cb_recv_for_aio(work* w){
     nng_msg *om = w->msg;
-    if (!om) return;
+    if (!om) {nng_sleep_aio(0, w->aio); return;}
 
     _rr* rep = (_rr*)w->user_data;
 
@@ -438,7 +438,7 @@
     rep->cv_msg_.notify_all();
 }
 
-static struct work *alloc_work(nng_socket sock, _rr* rep)
+static struct work *alloc_work(nng_socket sock, _rr* rep, const int mode)
 {
     struct work *w;
     int          rv;
@@ -458,10 +458,12 @@
         return NULL;
     }
     w->state = INIT;
+    w->mode = mode;
+
     return (w);
 }
 
-static int create_server(nng_socket* sock, const string& url, const int count, _rr* rep){
+static int create_server(nng_socket* sock, const string& url, const int count, _rr* rep, const int mode){
     TAG;
     if (sock->id > 0) return 0;
 
@@ -474,7 +476,7 @@
 
     work** works = (work**)malloc(sizeof(void*) * count);
     for (int i = 0; i < count; i++) {
-        works[i] = alloc_work(*sock, rep);
+        works[i] = alloc_work(*sock, rep, mode);
     }
 
     remove_exist(url);
@@ -511,12 +513,12 @@
         ipc = url;
     }
     rep->url_ = ipc;
-    if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1;
+    if(create_server(&get<0>(rep->socks_), ipc, 62, rep, REPLY_IPC) != 0) return -1;
 
     if (port > 0){
         get<1>(get<1>(rep->socks_)) = port;
         ipc = "tcp://0.0.0.0:" + to_string(port);
-        if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
+        if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep, REPLY_TCP) != 0) return -1;
         // printf("======>> create server for remote port %d\n", port);
     }else {
         get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
@@ -566,6 +568,7 @@
     int tm = to_ms > 0 ? to_ms : 30;
 
     uint64_t key{};
+    work* w{};
     {
         unique_lock<mutex> l(rep->mtx_msg_);
         auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{
@@ -579,12 +582,18 @@
         key = iter->first;
         *msg = move(iter->second);
         rep->msg_.erase(iter);
+        auto witer = rep->works_.find(key);
+        if (witer != rep->works_.end()){
+            w = witer->second;
+        }
     }
+
+    if (!w) return -1;
 
     *src = malloc(sizeof(uint64_t));
     *(uint64_t*)(*src) = key;
 
-    return 0;
+    return w->mode;
 }
 
 int send_reply(const void* src, const void* msg, const int msg_len, void* arg/*=NULL*/){

--
Gitblit v1.8.0