zhangmeng
2022-08-04 d90b3c3779d9c6bd2884540621d107e4ac10e930
tcp remote reduce timeout in readrequest
6个文件已修改
182 ■■■■ 已修改文件
CMakeLists.txt 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.cpp 99 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bn_api.cpp 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/common.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/interface_bus_api.cpp 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/nng_wrap.cpp 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
CMakeLists.txt
@@ -34,6 +34,10 @@
add_subdirectory(${PROJECT_SOURCE_DIR}/src)
include_directories(${PROJECT_SOURCE_DIR}/src)
add_executable(${Target} main.cpp)
# target_link_libraries(${Target} nng bhome_msg)
target_link_libraries(${Target} bus_nng)
# add_executable(${Target} main.cpp)
# target_link_libraries(${Target} bus_nng)
file(GLOB cxx_sources "${PROJECT_SOURCE_DIR}/proto/${ARCH}/*.cc")
include_directories(${PROJECT_SOURCE_DIR}/proto/${ARCH}/ ${PROTO_LIB_PATH}/include)
add_executable(${Target} main.cpp ${cxx_sources})
target_link_libraries(${Target} bus_nng bhome_msg pthread rt)
main.cpp
@@ -93,11 +93,106 @@
  });
}
#include "bhome_msg.pb.h"
#include "bhome_msg_api.pb.h"
using namespace bhome_msg;
int main(int argc, char const *argv[])
{
  // run_test([&]{test_rr();});
  test_rr();
  test_ps();
  // test_rr();
  // test_ps();
  // return 0;
  int reply = 1;
  string id("hello-reply");
  if (argc > 1) {
    printf("this is request\n");
    id = "hello-request";
    reply = 0;
  }else{
    printf("this is reply\n");
  }
  ProcInfo pi;
  pi.set_proc_id(id);
  pi.set_name("works");
  string out;
  pi.SerializeToString(&out);
  void* rep;
  int repl = 0;
  BHRegister(out.data(), out.size(), &rep, &repl, 500);
  if (reply){
    while (true) {
      void* pid;
      int pidl;
      void* req;
      int reql;
      void* src;
      if (BHReadRequest(&pid, &pidl, &req, &reql, &src, 500)){
        bhome_msg::MsgRequestTopic msg;
        msg.ParseFromArray(req, reql);
        printf("recv request %d msg data %s\n", reql, msg.data().c_str());
        bhome_msg::MsgRequestTopicReply rep;
        rep.set_data(msg.data() + "-reply");
        string srep;
        rep.SerializeToString(&srep);
        // auto s = chrono::steady_clock::now();
        int ret = BHSendReply(src, srep.data(), srep.size());
        // auto e = chrono::steady_clock::now();
        // printf("reply time %lu ms\n", chrono::duration_cast<chrono::milliseconds>(e-s).count());
      }else{
        // usleep(50000);
        // printf("BHReadRequest no data\n");
      }
    }
  }else{
    printf("start request %d\n", atoi(argv[1]));
    bhome_msg::BHAddress addr;
    addr.set_ip("192.168.20.108");
    addr.set_port(atoi(argv[1]));
    string node;
    addr.SerializeToString(&node);
    void* pid;
    int pidl;
    void* rep;
    int repl;
    unsigned idx = 0;
    while(1){
      bhome_msg::MsgRequestTopic msg;
      msg.set_topic("hello-reply");
      msg.set_data("hell-world-" + to_string(getpid()) + "-" + to_string(idx++));
      string smsg;
      msg.SerializeToString(&smsg);
      // auto s = chrono::steady_clock::now();
      int ret = BHRequest(node.data(), node.size(), smsg.data(), smsg.size(),
        &pid, &pidl, &rep, &repl, 5000);
      // auto e = chrono::steady_clock::now();
      // printf("request time %lu ms\n", chrono::duration_cast<chrono::milliseconds>(e-s).count());
      bhome_msg::MsgRequestTopicReply prep;
      prep.ParseFromArray(rep, repl);
      printf("pid %d BHRequest rep data %s size %d ret %d\n", getpid(), prep.data().c_str(),
        repl, ret);
    }
  }
  return 0;
}
src/bn_api.cpp
@@ -42,7 +42,7 @@
        unsigned short port = *(unsigned short*)(out);
        copy_memory(reply, reply_len, (char*)out + port_size, out_len - port_size);
        BHFree(out, out_len);
        // printf("======>> recv port %d\n", port);
        printf("======>> recv port %d\n", port);
/////////////////////////////////////////////////////////////////////////
        const auto& url_pub_proxy = get_url(URLPubProxy);
@@ -351,11 +351,7 @@
    string url{};
    // BHQueryTopicAddress获取proc_id
    string procid{};
    if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
        // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
        // return false;
        url = "ipc:///tmp/" + procid;
    }
    if (remote && remote_len > 0){
        BHAddress addr;
        if (addr.ParseFromArray(remote, remote_len)){
@@ -364,7 +360,12 @@
                // printf("======>> BHRequest use remote address %s\n", url.c_str());
            }
        }
    } else if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
        // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
        // return false;
        url = "ipc:///tmp/" + procid;
    }
    if (url.empty()) {
        set_last_error("BHRequest url empty");
        return false;
@@ -387,11 +388,13 @@
{
    string msg;
    auto ret = read_request(src, &msg, timeout_ms);
    if (ret != 0) return false;
    if (ret < 0) return false;
    string procid{};
    if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
        copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
    if (ret == REPLY_IPC){
        string procid{};
        if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
            copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
        }
    }
    copy_memory(request, request_len, msg.data(), msg.size());
src/common.h
@@ -179,6 +179,7 @@
};
enum { INIT, RECV, WAIT, SEND };
enum { REPLY_IPC, REPLY_TCP };
struct work {
    int state{-1};
    nng_aio *aio{};
@@ -186,6 +187,7 @@
    nng_ctx  ctx{};
    void(*cb_recv)(work*){};
    void* user_data{};
    int mode{-1};
};
static const std::string rr_unblocking_msg_{"~!@#$%^&*()-=<<UNBLOCKING>>=-()*&^%$#@!~"};
src/interface_bus_api.cpp
@@ -341,11 +341,7 @@
    string url{};
    // BHQueryTopicAddress获取proc_id
    string procid{};
    if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
        // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
        // return false;
        url = "ipc:///tmp/" + procid;
    }
    if (remote && remote_len > 0){
        BHAddress addr;
        if (addr.ParseFromArray(remote, remote_len)){
@@ -354,7 +350,12 @@
                // printf("======>> BHRequest use remote address %s\n", url.c_str());
            }
        }
    } else if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
        // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
        // return false;
        url = "ipc:///tmp/" + procid;
    }
    if (url.empty()) {
        set_last_error("bus_request url empty");
        return false;
@@ -383,13 +384,15 @@
    string msg;
    auto ret = read_request(src, &msg, timeout_ms, &get<6>(*b));
    // printf("bus_recv_request read_request ret %d msg %s\n", ret, msg.c_str());
    if (ret != 0) return false;
    if (ret < 0) return false;
    string procid{};
    if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
        // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
        // return false;
        copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
    if (ret == REPLY_IPC){
        string procid{};
        if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
            // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
            // return false;
            copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
        }
    }
    copy_memory(request, request_len, msg.data(), msg.size());
src/nng_wrap.cpp
@@ -423,7 +423,7 @@
static void cb_recv_for_aio(work* w){
    nng_msg *om = w->msg;
    if (!om) return;
    if (!om) {nng_sleep_aio(0, w->aio); return;}
    _rr* rep = (_rr*)w->user_data;
@@ -438,7 +438,7 @@
    rep->cv_msg_.notify_all();
}
static struct work *alloc_work(nng_socket sock, _rr* rep)
static struct work *alloc_work(nng_socket sock, _rr* rep, const int mode)
{
    struct work *w;
    int          rv;
@@ -458,10 +458,12 @@
        return NULL;
    }
    w->state = INIT;
    w->mode = mode;
    return (w);
}
static int create_server(nng_socket* sock, const string& url, const int count, _rr* rep){
static int create_server(nng_socket* sock, const string& url, const int count, _rr* rep, const int mode){
    TAG;
    if (sock->id > 0) return 0;
@@ -474,7 +476,7 @@
    work** works = (work**)malloc(sizeof(void*) * count);
    for (int i = 0; i < count; i++) {
        works[i] = alloc_work(*sock, rep);
        works[i] = alloc_work(*sock, rep, mode);
    }
    remove_exist(url);
@@ -511,12 +513,12 @@
        ipc = url;
    }
    rep->url_ = ipc;
    if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1;
    if(create_server(&get<0>(rep->socks_), ipc, 62, rep, REPLY_IPC) != 0) return -1;
    if (port > 0){
        get<1>(get<1>(rep->socks_)) = port;
        ipc = "tcp://0.0.0.0:" + to_string(port);
        if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
        if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep, REPLY_TCP) != 0) return -1;
        // printf("======>> create server for remote port %d\n", port);
    }else {
        get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
@@ -566,6 +568,7 @@
    int tm = to_ms > 0 ? to_ms : 30;
    uint64_t key{};
    work* w{};
    {
        unique_lock<mutex> l(rep->mtx_msg_);
        auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{
@@ -579,12 +582,18 @@
        key = iter->first;
        *msg = move(iter->second);
        rep->msg_.erase(iter);
        auto witer = rep->works_.find(key);
        if (witer != rep->works_.end()){
            w = witer->second;
        }
    }
    if (!w) return -1;
    *src = malloc(sizeof(uint64_t));
    *(uint64_t*)(*src) = key;
    return 0;
    return w->mode;
}
int send_reply(const void* src, const void* msg, const int msg_len, void* arg/*=NULL*/){