From 16935f4aebffdd1b6580b844391a0aa0f4f3012b Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 22 四月 2024 10:29:12 +0800 Subject: [PATCH] bug fixed --- src/nng_wrap.cpp | 63 +++++++++++++++++++++++-------- 1 files changed, 46 insertions(+), 17 deletions(-) diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp index 0a836fc..3138e2f 100644 --- a/src/nng_wrap.cpp +++ b/src/nng_wrap.cpp @@ -4,7 +4,6 @@ #include <vector> #include "common.h" -using namespace std; #include <nng/protocol/reqrep0/rep.h> #include <nng/supplemental/util/platform.h> @@ -12,6 +11,8 @@ #include "nng/compat/nanomsg/reqrep.h" #include "nng/compat/nanomsg/pubsub.h" #include "nng/compat/nanomsg/survey.h" + +using namespace std; namespace nng_wrap { @@ -216,6 +217,9 @@ sub->socket_ = sock; sub->t_ = get_thread([](const auto sub){ while (!sub->t_quit_.load()) { + // for(auto&& i : sub->topics_) { + // printf("======>> sub topic %s\n", i.c_str()); + // } char* m{}; // int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT); int m_len = nn_recv(sub->socket_, &m, NN_MSG, 0); @@ -237,7 +241,7 @@ } } } - // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length()); + // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msgl); if (found_topic){ lock_guard<mutex> l(sub->mtx_msg_); sub->msg_.emplace_back(string(topic), string(msg, msgl)); @@ -245,6 +249,7 @@ } nn_freemsg(m); }else { + if (!sub->failed_topics_.empty()) { lock_guard<mutex> l{sub->mtx_failed_topics_}; if (!sub->failed_topics_.empty()){ @@ -257,7 +262,7 @@ } } } - this_thread::sleep_for(chrono::milliseconds{6}); + // this_thread::sleep_for(chrono::milliseconds{6}); // printf("======>> subscribe nn_recv failed %s\n", nn_strerror(nn_errno())); } } @@ -309,17 +314,29 @@ _ps_sub* sub = (_ps_sub*)arg; if (!sub) sub = singleton<_ps_sub>(); + if (!topic && !msg) { + lock_guard<mutex> l{sub->mtx_msg_}; + for (int i = 0; i < 2; i++) { + if (!sub->msg_.empty()) + return 0; + this_thread::sleep_for(chrono::milliseconds(to_ms)); + } + return -1; + } + TAG; int tm = to_ms > 0 ? to_ms : 30; unique_lock<mutex> l(sub->mtx_msg_); - auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{ - return !sub->msg_.empty(); - }); - if (!status){ - PRNTVITAG("subscribe_read timeout"); - return -1; + if (sub->msg_.empty()) { + auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{ + return !sub->msg_.empty(); + }); + if (!status){ + PRNTVITAG("subscribe_read timeout"); + return -1; + } } auto& tmp = sub->msg_.front(); *topic = std::move(tmp.topic_); @@ -536,7 +553,7 @@ if (!rep->t_unblock_){ rep->t_unblock_.reset(new thread(get_thread([](const auto rep){ - constexpr int idle = 10; + constexpr int idle = 216; const auto data = rr_unblocking_msg_.data(); const auto data_size = rr_unblocking_msg_.size(); constexpr int life_span = timeout_req_rep*10; @@ -555,7 +572,7 @@ return tmp; }; while (!rep->t_quit_.load()) { - this_thread::sleep_for(chrono::milliseconds{10}); + this_thread::sleep_for(chrono::milliseconds{idle}); vector<struct work*> tmp = f(); for(auto && w : tmp){ aio_unblock(w, data, data_size); @@ -575,18 +592,30 @@ if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0) return -1; + if (!src && !msg) { + lock_guard<mutex> l{rep->mtx_msg_}; + for (int i = 0; i < 2; i++) { + if (!rep->msg_.empty()) + return 0; + this_thread::sleep_for(chrono::milliseconds(to_ms)); + } + return -1; + } + int tm = to_ms > 0 ? to_ms : 30; uint64_t key{}; work* w{}; { unique_lock<mutex> l(rep->mtx_msg_); - auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{ - return !rep->msg_.empty(); - }); - if (!status){ - PRNTVITAG("read_request timeout"); - return -1; + if (rep->msg_.empty()) { + auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{ + return !rep->msg_.empty(); + }); + if (!status){ + PRNTVITAG("read_request timeout"); + return -1; + } } auto iter = rep->msg_.begin(); key = iter->first; -- Gitblit v1.8.0