| | |
| | | lock_guard<mutex> l{rep->mtx_msg_}; |
| | | rep->works_.emplace(rep->work_index_, w); |
| | | rep->msg_.emplace(rep->work_index_, move(msg)); |
| | | // rep->works_.insert({rep->work_index_, w}); |
| | | // rep->msg_.insert({rep->work_index_, msg}); |
| | | // rep->works_[rep->work_index_] = w; |
| | | // rep->msg_[rep->work_index_] = msg; |
| | | rep->work_index_++; |
| | | rep->cv_msg_.notify_all(); |
| | | } |
| | |
| | | ipc = url; |
| | | } |
| | | rep->url_ = ipc; |
| | | if(create_server(&rep->sock_local_, ipc, 62, rep) != 0) return -1; |
| | | if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1; |
| | | |
| | | if (port > 0){ |
| | | rep->port_ = port; |
| | | get<1>(get<1>(rep->socks_)) = port; |
| | | ipc = "tcp://0.0.0.0:" + to_string(port); |
| | | if(create_server(&rep->sock_remote_, ipc, 62, rep) != 0) return -1; |
| | | if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1; |
| | | }else { |
| | | rep->sock_remote_.id = numeric_limits<int32_t>::max(); |
| | | get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max(); |
| | | } |
| | | |
| | | if (!rep->t_unblock_){ |
| | | rep->t_unblock_.reset(new thread([rep]{ |
| | | rep->t_unblock_.reset(new thread(get_thread([](const auto rep){ |
| | | constexpr int idle = 10; |
| | | const auto data = rr_unblocking_msg_.data(); |
| | | const auto data_size = rr_unblocking_msg_.size(); |
| | | while (!rep->t_quit_.load()) { |
| | | this_thread::sleep_for(chrono::milliseconds{10}); |
| | | auto f = [rep]{ |
| | | vector<struct work*> tmp{}; |
| | | { |
| | | lock_guard<mutex> l{rep->mtx_msg_}; |
| | | for(auto iter = rep->works_.begin(); iter != rep->works_.end();){ |
| | | if ((iter->second+=idle) > timeout_req_rep){ |
| | | tmp.push_back(iter->second.w_); |
| | | iter = rep->works_.erase(iter); |
| | | }else { |
| | | ++iter; |
| | | } |
| | | lock_guard<mutex> l{rep->mtx_msg_}; |
| | | for(auto iter = rep->works_.begin(); iter != rep->works_.end();){ |
| | | if ((iter->second+=idle) > timeout_req_rep){ |
| | | tmp.push_back(iter->second.w_); |
| | | iter = rep->works_.erase(iter); |
| | | }else { |
| | | ++iter; |
| | | } |
| | | } |
| | | return tmp; |
| | | }; |
| | | while (!rep->t_quit_.load()) { |
| | | this_thread::sleep_for(chrono::milliseconds{10}); |
| | | vector<struct work*> tmp = f(); |
| | | for(auto && w : tmp){ |
| | | aio_unblock(w, data, data_size); |
| | | } |
| | | } |
| | | })); |
| | | }, rep))); |
| | | } |
| | | |
| | | return 0; |
| | |
| | | _rr* rep = (_rr*)arg; |
| | | if (!rep) rep = singleton<_rr>(); |
| | | |
| | | if (rep->sock_local_.id == 0 || rep->sock_remote_.id == 0) |
| | | if (start_reply(rep->url_, rep->port_) != 0) |
| | | if (get<0>(rep->socks_).id == 0 || get<0>(get<1>(rep->socks_)).id == 0) |
| | | if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0) |
| | | return -1; |
| | | |
| | | int tm = to_ms > 0 ? to_ms : 30; |