zhangmeng
2021-12-17 aac0fe50f0ae9d13ff8ff7db2288a877b2fb2c53
src/req_rep.cpp
@@ -88,10 +88,6 @@
    lock_guard<mutex> l{rep->mtx_msg_};
    rep->works_.emplace(rep->work_index_, w);
    rep->msg_.emplace(rep->work_index_, move(msg));
    // rep->works_.insert({rep->work_index_, w});
    // rep->msg_.insert({rep->work_index_, msg});
    // rep->works_[rep->work_index_] = w;
    // rep->msg_[rep->work_index_] = msg;
    rep->work_index_++;
    rep->cv_msg_.notify_all();
}
@@ -166,40 +162,42 @@
        ipc = url;
    }
    rep->url_ = ipc;
    if(create_server(&rep->sock_local_, ipc, 62, rep) != 0) return -1;
    if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1;
    if (port > 0){
        rep->port_ = port;
        get<1>(get<1>(rep->socks_)) = port;
        ipc = "tcp://0.0.0.0:" + to_string(port);
        if(create_server(&rep->sock_remote_, ipc, 62, rep) != 0) return -1;
        if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
    }else {
        rep->sock_remote_.id = numeric_limits<int32_t>::max();
        get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
    }
    if (!rep->t_unblock_){
        rep->t_unblock_.reset(new thread([rep]{
        rep->t_unblock_.reset(new thread(get_thread([](const auto rep){
            constexpr int idle = 10;
            const auto data = rr_unblocking_msg_.data();
            const auto data_size = rr_unblocking_msg_.size();
            while (!rep->t_quit_.load()) {
                this_thread::sleep_for(chrono::milliseconds{10});
            auto f = [rep]{
                vector<struct work*> tmp{};
                {
                    lock_guard<mutex> l{rep->mtx_msg_};
                    for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
                        if ((iter->second+=idle) > timeout_req_rep){
                            tmp.push_back(iter->second.w_);
                            iter = rep->works_.erase(iter);
                        }else {
                            ++iter;
                        }
                lock_guard<mutex> l{rep->mtx_msg_};
                for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
                    if ((iter->second+=idle) > timeout_req_rep){
                        tmp.push_back(iter->second.w_);
                        iter = rep->works_.erase(iter);
                    }else {
                        ++iter;
                    }
                }
                return tmp;
            };
            while (!rep->t_quit_.load()) {
                this_thread::sleep_for(chrono::milliseconds{10});
                vector<struct work*> tmp = f();
                for(auto && w : tmp){
                    aio_unblock(w, data, data_size);
                }
            }
        }));
        }, rep)));
    }
    return 0;
@@ -209,8 +207,8 @@
    _rr* rep = (_rr*)arg;
    if (!rep) rep = singleton<_rr>();
    if (rep->sock_local_.id == 0 || rep->sock_remote_.id == 0)
        if (start_reply(rep->url_, rep->port_) != 0)
    if (get<0>(rep->socks_).id == 0 || get<0>(get<1>(rep->socks_)).id == 0)
        if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0)
            return -1;
    int tm = to_ms > 0 ? to_ms : 30;