zhangmeng
2023-12-15 ab42172c747112e7306efb7aebdc853c3c45bd7a
remove log
2个文件已修改
29 ■■■■■ 已修改文件
main.cpp 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/nng_wrap.cpp 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.cpp
@@ -68,6 +68,7 @@
      for(auto && i : topics){
        auto msg = base_cont + "test_ps pub message "+i+"-->msg";
        TestPub(i.c_str(), i.length(), msg.c_str(), msg.length());
        // printf("TestPub %s\n", msg.c_str());
        this_thread::sleep_for(chrono::milliseconds{126});
      }
    }
@@ -160,8 +161,10 @@
  }else{
    printf("start request %d\n", atoi(argv[1]));
    bhome_msg::BHAddress addr;
    addr.set_ip("192.168.20.108");
    addr.set_port(atoi(argv[1]));
    // addr.set_ip("192.168.20.108");
    // addr.set_port(atoi(argv[1]));
    // addr.set_ip("192.168.20.108");
    // addr.set_port(atoi(argv[1]));
    string node;
    addr.SerializeToString(&node);
@@ -180,7 +183,8 @@
      msg.SerializeToString(&smsg);
      // auto s = chrono::steady_clock::now();
      int ret = BHRequest(node.data(), node.size(), smsg.data(), smsg.size(),
      // int ret = BHRequest(node.data(), node.size(), smsg.data(), smsg.size(),
      int ret = BHRequest(NULL, 0, smsg.data(), smsg.size(),
        &pid, &pidl, &rep, &repl, 5000);
      // auto e = chrono::steady_clock::now();
      // printf("request time %lu ms\n", chrono::duration_cast<chrono::milliseconds>(e-s).count());
@@ -188,8 +192,8 @@
      bhome_msg::MsgRequestTopicReply prep;
      prep.ParseFromArray(rep, repl);
      printf("pid %d BHRequest rep data %s size %d ret %d\n", getpid(), prep.data().c_str(),
        repl, ret);
      // printf("pid %d BHRequest rep data %s size %d ret %d\n", getpid(), prep.data().c_str(),
      //   repl, ret);
    }
  }
src/nng_wrap.cpp
@@ -4,7 +4,6 @@
#include <vector>
#include "common.h"
using namespace std;
#include <nng/protocol/reqrep0/rep.h>
#include <nng/supplemental/util/platform.h>
@@ -12,6 +11,8 @@
#include "nng/compat/nanomsg/reqrep.h"
#include "nng/compat/nanomsg/pubsub.h"
#include "nng/compat/nanomsg/survey.h"
using namespace std;
namespace nng_wrap {
@@ -216,6 +217,9 @@
    sub->socket_ = sock;
    sub->t_ = get_thread([](const auto sub){
        while (!sub->t_quit_.load()) {
            // for(auto&& i : sub->topics_) {
            //     printf("======>> sub topic %s\n", i.c_str());
            // }
            char* m{};
            // int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT);
            int m_len = nn_recv(sub->socket_, &m, NN_MSG, 0);
@@ -237,7 +241,7 @@
                        }
                    }
                }
                // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length());
                // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msgl);
                if (found_topic){
                    lock_guard<mutex> l(sub->mtx_msg_);
                    sub->msg_.emplace_back(string(topic), string(msg, msgl));
@@ -245,6 +249,7 @@
                }
                nn_freemsg(m);
            }else {
                if (!sub->failed_topics_.empty())
                {
                    lock_guard<mutex> l{sub->mtx_failed_topics_};
                    if (!sub->failed_topics_.empty()){
@@ -257,7 +262,7 @@
                        }
                    }
                }
                this_thread::sleep_for(chrono::milliseconds{6});
                // this_thread::sleep_for(chrono::milliseconds{6});
                // printf("======>> subscribe nn_recv failed %s\n", nn_strerror(nn_errno()));
            }
        }
@@ -536,7 +541,7 @@
    if (!rep->t_unblock_){
        rep->t_unblock_.reset(new thread(get_thread([](const auto rep){
            constexpr int idle = 10;
            constexpr int idle = 216;
            const auto data = rr_unblocking_msg_.data();
            const auto data_size = rr_unblocking_msg_.size();
            constexpr int life_span = timeout_req_rep*10;
@@ -555,7 +560,7 @@
                return tmp;
            };
            while (!rep->t_quit_.load()) {
                this_thread::sleep_for(chrono::milliseconds{10});
                this_thread::sleep_for(chrono::milliseconds{idle});
                vector<struct work*> tmp = f();
                for(auto && w : tmp){
                    aio_unblock(w, data, data_size);