From aac0fe50f0ae9d13ff8ff7db2288a877b2fb2c53 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 17 十二月 2021 14:13:55 +0800
Subject: [PATCH] bug fixed

---
 src/req_rep.cpp |   44 +++++++++++++++++++++-----------------------
 1 files changed, 21 insertions(+), 23 deletions(-)

diff --git a/src/req_rep.cpp b/src/req_rep.cpp
index 8ac45be..967d586 100644
--- a/src/req_rep.cpp
+++ b/src/req_rep.cpp
@@ -88,10 +88,6 @@
     lock_guard<mutex> l{rep->mtx_msg_};
     rep->works_.emplace(rep->work_index_, w);
     rep->msg_.emplace(rep->work_index_, move(msg));
-    // rep->works_.insert({rep->work_index_, w});
-    // rep->msg_.insert({rep->work_index_, msg});
-    // rep->works_[rep->work_index_] = w;
-    // rep->msg_[rep->work_index_] = msg;
     rep->work_index_++;
     rep->cv_msg_.notify_all();
 }
@@ -166,40 +162,42 @@
         ipc = url;
     }
     rep->url_ = ipc;
-    if(create_server(&rep->sock_local_, ipc, 62, rep) != 0) return -1;
+    if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1;
 
     if (port > 0){
-        rep->port_ = port;
+        get<1>(get<1>(rep->socks_)) = port;
         ipc = "tcp://0.0.0.0:" + to_string(port);
-        if(create_server(&rep->sock_remote_, ipc, 62, rep) != 0) return -1;
+        if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
     }else {
-        rep->sock_remote_.id = numeric_limits<int32_t>::max();
+        get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
     }
 
     if (!rep->t_unblock_){
-        rep->t_unblock_.reset(new thread([rep]{
+        rep->t_unblock_.reset(new thread(get_thread([](const auto rep){
             constexpr int idle = 10;
             const auto data = rr_unblocking_msg_.data();
             const auto data_size = rr_unblocking_msg_.size();
-            while (!rep->t_quit_.load()) {
-                this_thread::sleep_for(chrono::milliseconds{10});
+            auto f = [rep]{
                 vector<struct work*> tmp{};
-                {
-                    lock_guard<mutex> l{rep->mtx_msg_};
-                    for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
-                        if ((iter->second+=idle) > timeout_req_rep){
-                            tmp.push_back(iter->second.w_);
-                            iter = rep->works_.erase(iter);
-                        }else {
-                            ++iter;
-                        }
+                lock_guard<mutex> l{rep->mtx_msg_};
+                for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
+                    if ((iter->second+=idle) > timeout_req_rep){
+                        tmp.push_back(iter->second.w_);
+                        iter = rep->works_.erase(iter);
+                    }else {
+                        ++iter;
                     }
                 }
+                return tmp;
+            };
+            while (!rep->t_quit_.load()) {
+                this_thread::sleep_for(chrono::milliseconds{10});
+                vector<struct work*> tmp = f();
                 for(auto && w : tmp){
                     aio_unblock(w, data, data_size);
                 }
             }
-        }));
+        }, rep)));
     }
 
     return 0;
@@ -209,8 +207,8 @@
     _rr* rep = (_rr*)arg;
     if (!rep) rep = singleton<_rr>();
 
-    if (rep->sock_local_.id == 0 || rep->sock_remote_.id == 0)
-        if (start_reply(rep->url_, rep->port_) != 0)
+    if (get<0>(rep->socks_).id == 0 || get<0>(get<1>(rep->socks_)).id == 0)
+        if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0)
             return -1;
 
     int tm = to_ms > 0 ? to_ms : 30;

--
Gitblit v1.8.0