From aac0fe50f0ae9d13ff8ff7db2288a877b2fb2c53 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 17 十二月 2021 14:13:55 +0800
Subject: [PATCH] bug fixed

---
 src/nng_wrap.cpp          |   28 ++------
 src/req_rep.cpp           |   44 +++++++-------
 src/common.h              |   40 +++++++++----
 src/interface_bus_api.cpp |   38 +++++-------
 4 files changed, 72 insertions(+), 78 deletions(-)

diff --git a/src/common.h b/src/common.h
index 1a9a826..db4761d 100644
--- a/src/common.h
+++ b/src/common.h
@@ -9,12 +9,30 @@
 #include <unordered_map>
 #include <mutex>
 #include <condition_variable>
+#include <tuple>
 #include <unistd.h>
 
 #include "nng/compat/nanomsg/nn.h"
 #include <nng/nng.h>
 
 namespace nng_wrap {
+template <class... T> struct make_void{typedef void type;};
+template <class... T> using void_t = typename make_void<T...>::type;
+template <class T, typename = void> struct is_default_c : std::false_type{};
+template <class T> struct is_default_c<T, void_t<decltype(T())>> : std::true_type{};
+template<class T> using is_function_t = typename std::is_function<typename std::remove_pointer<typename std::remove_reference<T>::type>::type>::type;
+template <bool, class T> struct is_callable_h : is_function_t<T>{};
+template <class T> struct is_callable_h<true, T>{
+private:
+    struct FB{void operator()();};
+    struct D : T, FB{};
+    template<typename U, U> struct c;
+    template<class> static std::true_type t(...);
+    template<class C> static std::false_type t(c<void(FB::*)(), &C::operator()>*);
+public:
+    using type = decltype(t<D>(nullptr));
+};
+template <class T> using is_callable = typename is_callable_h<std::is_class<typename std::remove_reference<T>::type>::value, typename std::remove_reference<T>::type>::type;
 
 static thread_local std::string verbose_info{};
 #ifndef PRNTVITAG
@@ -110,7 +128,6 @@
 public:
     struct psmsg{
         DISABLE_COPY_AND_ASSIGN(psmsg);
-        psmsg()=delete;
         psmsg(const std::string& t, std::string&& m)
         :topic_(t),data_(std::move(m)){}
         std::string topic_{};
@@ -173,31 +190,23 @@
     DISABLE_COPY_AND_ASSIGN(_rr);
     _rr()=default;
     ~_rr(){
-        if(sock_local_.id > 0) nng_close(sock_local_);
-        if(sock_remote_.id > 0) nng_close(sock_remote_);
+        if(std::get<0>(socks_).id > 0) nng_close(std::get<0>(socks_));
+        if(std::get<0>(std::get<1>(socks_)).id > 0) nng_close(std::get<0>(std::get<1>(socks_)));
         t_quit_.store(true, std::memory_order_relaxed);
         if (t_unblock_&&t_unblock_->joinable()) t_unblock_->join();
     }
 
-
     std::unique_ptr<std::thread>                    t_unblock_{nullptr};
     std::atomic_bool                                t_quit_{false};
 
-    nng_socket                                      sock_local_{0};
-    nng_socket                                      sock_remote_{0};
-    int                                             port_{-1};
+    std::tuple<nng_socket, std::tuple<nng_socket, int>> socks_;
 
     std::unordered_map<uint64_t, std::string>       msg_{};
     class worker{
         worker& in_op(const worker& w){if(&w!=this){w_=w.w_;life_=w.life_;}return *this;};
     public:
-        worker()=default;
-        ~worker()=default;
         worker(struct work* w):w_(w),life_(0){}
-        worker(const worker& w):w_(w.w_),life_(w.life_){}
         worker(worker&& w):w_(w.w_),life_(w.life_){}
-        worker& operator=(const worker& w){return in_op(w);}
-        worker& operator=(worker&& w){return in_op(w);}
         operator struct work*() const{return w_;}
         operator int&() {return life_;}
         struct work* w_{};
@@ -210,7 +219,12 @@
 
 };
 
-template<class T> inline T* singleton(){ static T t; return &t; }
+template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0> inline T* singleton(){ static auto t = std::make_unique<T>(); return t.get(); }
+
+template <class T, class... Args, typename std::enable_if<is_callable<T>::value, int>::type=0>
+inline std::thread get_thread(T&& t, Args&&... args){
+    return std::thread(std::forward<T>(t), std::forward<Args>(args)...);
+}
 
 }
 #endif
\ No newline at end of file
diff --git a/src/interface_bus_api.cpp b/src/interface_bus_api.cpp
index 20a4659..a23dd79 100644
--- a/src/interface_bus_api.cpp
+++ b/src/interface_bus_api.cpp
@@ -3,20 +3,14 @@
 using namespace nng_wrap;
 
 #include "common.h"
+#include <tuple>
 using namespace std;
 
 #include "bhome_msg.pb.h"
 #include "bhome_msg_api.pb.h"
 using namespace bhome_msg;
 
-struct bus{
-    _ps         pub{};
-    _ps_sub     sub{};
-    _sv         sv{};
-    _rr         rr{};
-    string      proc_id{};
-};
-
+using bus = tuple<_ps, _ps_sub, _sv, _rr, string>;
 void* bus_register(const void *proc_info,
                const int proc_info_len,
                void **reply,
@@ -36,19 +30,19 @@
 /////////////////////////////////////////////////////////////////////////
         bus *b = new bus;
         bhome_msg::ProcInfo pi;
-        if (pi.ParseFromArray(proc_info, proc_info_len)) b->proc_id = pi.proc_id();
+        if (pi.ParseFromArray(proc_info, proc_info_len)) get<4>(*b) = pi.proc_id();
 
         const auto& url_hb = get_url(URLHeartBeat);
-        respond_survey(url_hb,string{(char*)proc_info,(size_t)proc_info_len},&b->sv);
+        respond_survey(url_hb,string{(char*)proc_info,(size_t)proc_info_len},&get<2>(*b));
 
         const auto& url_pub_proxy = get_url(URLPubProxy);
-        publish(url_pub_proxy, NULL, 0, &b->pub);
+        publish(url_pub_proxy, NULL, 0, &get<0>(*b));
         const auto& url_sub_queue = get_url(URLSubQueue);
-        subscribe_center(url_sub_queue, &b->sub);
+        subscribe_center(url_sub_queue, &get<1>(*b));
 
         // temporary
         port = 0;
-        start_reply(b->proc_id, port, &b->rr);
+        start_reply(get<4>(*b), port, &get<3>(*b));
 
         return b;
     }
@@ -96,7 +90,7 @@
         return false;
     }
 
-    if (b->proc_id.empty()) {
+    if (get<4>(*b).empty()) {
         PRNTVITAG("proc_id is null");
         return false;
     }
@@ -108,7 +102,7 @@
     }
 
     bhome_msg::MsgTopicList mtl2;
-    mtl2.add_topic_list(b->proc_id);
+    mtl2.add_topic_list(get<4>(*b));
     for(int i = 0; i < mtl.topic_list_size(); i++){
         mtl2.add_topic_list(mtl.topic_list(i));
     }
@@ -214,7 +208,7 @@
     }
 
     for(int i = 0; i < mtl.topic_list_size(); i ++){
-        subscribe_topic(mtl.topic_list(i), &b->sub);
+        subscribe_topic(mtl.topic_list(i), &get<1>(*b));
     }
 
     return true;
@@ -250,7 +244,7 @@
         PRNTVITAG("handle is null");
         return false;
     }
-    if (b->proc_id.empty()){
+    if (get<4>(*b).empty()){
         PRNTVITAG("proc_id is null");
         return false;
     }
@@ -266,11 +260,11 @@
     }
 
     MsgPublish newPub;
-    newPub.set_topic(b->proc_id);
+    newPub.set_topic(get<4>(*b));
     newPub.set_data(string{(const char*)msgpub, (const size_t)msgpub_len});
 
     string msg(newPub.SerializeAsString());
-    auto ret = publish(pub.topic(), msg.data(), msg.size(), &b->pub);
+    auto ret = publish(pub.topic(), msg.data(), msg.size(), &get<0>(*b));
     if (ret > 0) return true;
     return false;
 }
@@ -289,7 +283,7 @@
     }
 
     string topic, msg;
-    auto ret = subscribe_read(&topic, &msg, timeout_ms, &b->sub);
+    auto ret = subscribe_read(&topic, &msg, timeout_ms, &get<1>(*b));
     if (ret < 0) return false;
 
     MsgPublish newPub;
@@ -366,7 +360,7 @@
     }
 
     string msg;
-    auto ret = read_request(src, &msg, timeout_ms, &b->rr);
+    auto ret = read_request(src, &msg, timeout_ms, &get<3>(*b));
     if (ret != 0) return false;
 
     string procid{};
@@ -397,7 +391,7 @@
         return false;
     }
 
-    auto ret = send_reply(src, reply, reply_len, &b->rr);
+    auto ret = send_reply(src, reply, reply_len, &get<3>(*b));
 
     if (ret < 0) return false;
     return true;
diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp
index 1972e07..660f106 100644
--- a/src/nng_wrap.cpp
+++ b/src/nng_wrap.cpp
@@ -12,18 +12,6 @@
 
 namespace nng_wrap {
 
-// static int server_socket(const string& url, const int protocol, int family=AF_SP){
-//     int sock = nn_socket(family, protocol);
-//     if (sock < 0) return sock;
-//     remove_exist(url);
-//     int rc = nn_bind(sock, url.c_str());
-//     if (rc < 0) {
-//         nn_close(sock);
-//         return rc;
-//     }
-//     return sock;
-// }
-
 static int client_socket(const string& url, const int protocol, int family=AF_SP){
     int sock = nn_socket(family, protocol);
     if (sock < 0) return sock;
@@ -135,7 +123,7 @@
     }
     set_socket_timeout(sock, timeout_req_rep);
     pub->socket_ = sock;
-    pub->t_ = thread([pub]{
+    pub->t_ = get_thread([](const auto pub){
         while (!pub->t_quit_.load()) {
             _ps::psmsg *msg{NULL};
             {
@@ -166,7 +154,7 @@
             }
 
         }
-    });
+    }, pub);
     return sock;
 }
 
@@ -211,7 +199,7 @@
     }
     // set_socket_timeout(sock, timeout_req_rep);
     sub->socket_ = sock;
-    sub->t_ = thread([sub]{
+    sub->t_ = get_thread([](const auto sub){
         while (!sub->t_quit_.load()) {
             char* m;
             int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT);
@@ -223,9 +211,9 @@
                     lock_guard<mutex> l{sub->mtx_topics_};
                     for(auto && i : sub->topics_){
                         if (tmp_msg.size() < i.size()) continue;
-                        topic = tmp_msg.substr(0, i.size());
+                        topic = move(tmp_msg.substr(0, i.size()));
                         if (topic == i){
-                            msg = tmp_msg.substr(i.size());
+                            msg = move(tmp_msg.substr(i.size()));
                             break;
                         }
                     }
@@ -254,7 +242,7 @@
                 // printf("======>> subscribe nn_recv failed %s\n", nn_strerror(nn_errno()));
             }
         }
-    });
+    }, sub);
     return 0;
 }
 
@@ -331,7 +319,7 @@
 
     sv->url_ = url;
     sv->fixed_msg_ = move(fixed_msg);
-    sv->t_ = thread([sv]{
+    sv->t_ = get_thread([](const auto sv){
 
         TAG;
 
@@ -355,7 +343,7 @@
                 }
             }
         }
-    });
+    }, sv);
 
     return 0;
 }
diff --git a/src/req_rep.cpp b/src/req_rep.cpp
index 8ac45be..967d586 100644
--- a/src/req_rep.cpp
+++ b/src/req_rep.cpp
@@ -88,10 +88,6 @@
     lock_guard<mutex> l{rep->mtx_msg_};
     rep->works_.emplace(rep->work_index_, w);
     rep->msg_.emplace(rep->work_index_, move(msg));
-    // rep->works_.insert({rep->work_index_, w});
-    // rep->msg_.insert({rep->work_index_, msg});
-    // rep->works_[rep->work_index_] = w;
-    // rep->msg_[rep->work_index_] = msg;
     rep->work_index_++;
     rep->cv_msg_.notify_all();
 }
@@ -166,40 +162,42 @@
         ipc = url;
     }
     rep->url_ = ipc;
-    if(create_server(&rep->sock_local_, ipc, 62, rep) != 0) return -1;
+    if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1;
 
     if (port > 0){
-        rep->port_ = port;
+        get<1>(get<1>(rep->socks_)) = port;
         ipc = "tcp://0.0.0.0:" + to_string(port);
-        if(create_server(&rep->sock_remote_, ipc, 62, rep) != 0) return -1;
+        if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
     }else {
-        rep->sock_remote_.id = numeric_limits<int32_t>::max();
+        get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
     }
 
     if (!rep->t_unblock_){
-        rep->t_unblock_.reset(new thread([rep]{
+        rep->t_unblock_.reset(new thread(get_thread([](const auto rep){
             constexpr int idle = 10;
             const auto data = rr_unblocking_msg_.data();
             const auto data_size = rr_unblocking_msg_.size();
-            while (!rep->t_quit_.load()) {
-                this_thread::sleep_for(chrono::milliseconds{10});
+            auto f = [rep]{
                 vector<struct work*> tmp{};
-                {
-                    lock_guard<mutex> l{rep->mtx_msg_};
-                    for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
-                        if ((iter->second+=idle) > timeout_req_rep){
-                            tmp.push_back(iter->second.w_);
-                            iter = rep->works_.erase(iter);
-                        }else {
-                            ++iter;
-                        }
+                lock_guard<mutex> l{rep->mtx_msg_};
+                for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
+                    if ((iter->second+=idle) > timeout_req_rep){
+                        tmp.push_back(iter->second.w_);
+                        iter = rep->works_.erase(iter);
+                    }else {
+                        ++iter;
                     }
                 }
+                return tmp;
+            };
+            while (!rep->t_quit_.load()) {
+                this_thread::sleep_for(chrono::milliseconds{10});
+                vector<struct work*> tmp = f();
                 for(auto && w : tmp){
                     aio_unblock(w, data, data_size);
                 }
             }
-        }));
+        }, rep)));
     }
 
     return 0;
@@ -209,8 +207,8 @@
     _rr* rep = (_rr*)arg;
     if (!rep) rep = singleton<_rr>();
 
-    if (rep->sock_local_.id == 0 || rep->sock_remote_.id == 0)
-        if (start_reply(rep->url_, rep->port_) != 0)
+    if (get<0>(rep->socks_).id == 0 || get<0>(get<1>(rep->socks_)).id == 0)
+        if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0)
             return -1;
 
     int tm = to_ms > 0 ? to_ms : 30;

--
Gitblit v1.8.0