From aac0fe50f0ae9d13ff8ff7db2288a877b2fb2c53 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期五, 17 十二月 2021 14:13:55 +0800 Subject: [PATCH] bug fixed --- src/req_rep.cpp | 44 +++++++++++++++++++++----------------------- 1 files changed, 21 insertions(+), 23 deletions(-) diff --git a/src/req_rep.cpp b/src/req_rep.cpp index 8ac45be..967d586 100644 --- a/src/req_rep.cpp +++ b/src/req_rep.cpp @@ -88,10 +88,6 @@ lock_guard<mutex> l{rep->mtx_msg_}; rep->works_.emplace(rep->work_index_, w); rep->msg_.emplace(rep->work_index_, move(msg)); - // rep->works_.insert({rep->work_index_, w}); - // rep->msg_.insert({rep->work_index_, msg}); - // rep->works_[rep->work_index_] = w; - // rep->msg_[rep->work_index_] = msg; rep->work_index_++; rep->cv_msg_.notify_all(); } @@ -166,40 +162,42 @@ ipc = url; } rep->url_ = ipc; - if(create_server(&rep->sock_local_, ipc, 62, rep) != 0) return -1; + if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1; if (port > 0){ - rep->port_ = port; + get<1>(get<1>(rep->socks_)) = port; ipc = "tcp://0.0.0.0:" + to_string(port); - if(create_server(&rep->sock_remote_, ipc, 62, rep) != 0) return -1; + if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1; }else { - rep->sock_remote_.id = numeric_limits<int32_t>::max(); + get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max(); } if (!rep->t_unblock_){ - rep->t_unblock_.reset(new thread([rep]{ + rep->t_unblock_.reset(new thread(get_thread([](const auto rep){ constexpr int idle = 10; const auto data = rr_unblocking_msg_.data(); const auto data_size = rr_unblocking_msg_.size(); - while (!rep->t_quit_.load()) { - this_thread::sleep_for(chrono::milliseconds{10}); + auto f = [rep]{ vector<struct work*> tmp{}; - { - lock_guard<mutex> l{rep->mtx_msg_}; - for(auto iter = rep->works_.begin(); iter != rep->works_.end();){ - if ((iter->second+=idle) > timeout_req_rep){ - tmp.push_back(iter->second.w_); - iter = rep->works_.erase(iter); - }else { - ++iter; - } + lock_guard<mutex> l{rep->mtx_msg_}; + for(auto iter = rep->works_.begin(); iter != rep->works_.end();){ + if ((iter->second+=idle) > timeout_req_rep){ + tmp.push_back(iter->second.w_); + iter = rep->works_.erase(iter); + }else { + ++iter; } } + return tmp; + }; + while (!rep->t_quit_.load()) { + this_thread::sleep_for(chrono::milliseconds{10}); + vector<struct work*> tmp = f(); for(auto && w : tmp){ aio_unblock(w, data, data_size); } } - })); + }, rep))); } return 0; @@ -209,8 +207,8 @@ _rr* rep = (_rr*)arg; if (!rep) rep = singleton<_rr>(); - if (rep->sock_local_.id == 0 || rep->sock_remote_.id == 0) - if (start_reply(rep->url_, rep->port_) != 0) + if (get<0>(rep->socks_).id == 0 || get<0>(get<1>(rep->socks_)).id == 0) + if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0) return -1; int tm = to_ms > 0 ? to_ms : 30; -- Gitblit v1.8.0