From aac0fe50f0ae9d13ff8ff7db2288a877b2fb2c53 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期五, 17 十二月 2021 14:13:55 +0800 Subject: [PATCH] bug fixed --- src/nng_wrap.cpp | 28 ++------ src/req_rep.cpp | 44 +++++++------- src/common.h | 40 +++++++++---- src/interface_bus_api.cpp | 38 +++++------- 4 files changed, 72 insertions(+), 78 deletions(-) diff --git a/src/common.h b/src/common.h index 1a9a826..db4761d 100644 --- a/src/common.h +++ b/src/common.h @@ -9,12 +9,30 @@ #include <unordered_map> #include <mutex> #include <condition_variable> +#include <tuple> #include <unistd.h> #include "nng/compat/nanomsg/nn.h" #include <nng/nng.h> namespace nng_wrap { +template <class... T> struct make_void{typedef void type;}; +template <class... T> using void_t = typename make_void<T...>::type; +template <class T, typename = void> struct is_default_c : std::false_type{}; +template <class T> struct is_default_c<T, void_t<decltype(T())>> : std::true_type{}; +template<class T> using is_function_t = typename std::is_function<typename std::remove_pointer<typename std::remove_reference<T>::type>::type>::type; +template <bool, class T> struct is_callable_h : is_function_t<T>{}; +template <class T> struct is_callable_h<true, T>{ +private: + struct FB{void operator()();}; + struct D : T, FB{}; + template<typename U, U> struct c; + template<class> static std::true_type t(...); + template<class C> static std::false_type t(c<void(FB::*)(), &C::operator()>*); +public: + using type = decltype(t<D>(nullptr)); +}; +template <class T> using is_callable = typename is_callable_h<std::is_class<typename std::remove_reference<T>::type>::value, typename std::remove_reference<T>::type>::type; static thread_local std::string verbose_info{}; #ifndef PRNTVITAG @@ -110,7 +128,6 @@ public: struct psmsg{ DISABLE_COPY_AND_ASSIGN(psmsg); - psmsg()=delete; psmsg(const std::string& t, std::string&& m) :topic_(t),data_(std::move(m)){} std::string topic_{}; @@ -173,31 +190,23 @@ DISABLE_COPY_AND_ASSIGN(_rr); _rr()=default; ~_rr(){ - if(sock_local_.id > 0) nng_close(sock_local_); - if(sock_remote_.id > 0) nng_close(sock_remote_); + if(std::get<0>(socks_).id > 0) nng_close(std::get<0>(socks_)); + if(std::get<0>(std::get<1>(socks_)).id > 0) nng_close(std::get<0>(std::get<1>(socks_))); t_quit_.store(true, std::memory_order_relaxed); if (t_unblock_&&t_unblock_->joinable()) t_unblock_->join(); } - std::unique_ptr<std::thread> t_unblock_{nullptr}; std::atomic_bool t_quit_{false}; - nng_socket sock_local_{0}; - nng_socket sock_remote_{0}; - int port_{-1}; + std::tuple<nng_socket, std::tuple<nng_socket, int>> socks_; std::unordered_map<uint64_t, std::string> msg_{}; class worker{ worker& in_op(const worker& w){if(&w!=this){w_=w.w_;life_=w.life_;}return *this;}; public: - worker()=default; - ~worker()=default; worker(struct work* w):w_(w),life_(0){} - worker(const worker& w):w_(w.w_),life_(w.life_){} worker(worker&& w):w_(w.w_),life_(w.life_){} - worker& operator=(const worker& w){return in_op(w);} - worker& operator=(worker&& w){return in_op(w);} operator struct work*() const{return w_;} operator int&() {return life_;} struct work* w_{}; @@ -210,7 +219,12 @@ }; -template<class T> inline T* singleton(){ static T t; return &t; } +template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0> inline T* singleton(){ static auto t = std::make_unique<T>(); return t.get(); } + +template <class T, class... Args, typename std::enable_if<is_callable<T>::value, int>::type=0> +inline std::thread get_thread(T&& t, Args&&... args){ + return std::thread(std::forward<T>(t), std::forward<Args>(args)...); +} } #endif \ No newline at end of file diff --git a/src/interface_bus_api.cpp b/src/interface_bus_api.cpp index 20a4659..a23dd79 100644 --- a/src/interface_bus_api.cpp +++ b/src/interface_bus_api.cpp @@ -3,20 +3,14 @@ using namespace nng_wrap; #include "common.h" +#include <tuple> using namespace std; #include "bhome_msg.pb.h" #include "bhome_msg_api.pb.h" using namespace bhome_msg; -struct bus{ - _ps pub{}; - _ps_sub sub{}; - _sv sv{}; - _rr rr{}; - string proc_id{}; -}; - +using bus = tuple<_ps, _ps_sub, _sv, _rr, string>; void* bus_register(const void *proc_info, const int proc_info_len, void **reply, @@ -36,19 +30,19 @@ ///////////////////////////////////////////////////////////////////////// bus *b = new bus; bhome_msg::ProcInfo pi; - if (pi.ParseFromArray(proc_info, proc_info_len)) b->proc_id = pi.proc_id(); + if (pi.ParseFromArray(proc_info, proc_info_len)) get<4>(*b) = pi.proc_id(); const auto& url_hb = get_url(URLHeartBeat); - respond_survey(url_hb,string{(char*)proc_info,(size_t)proc_info_len},&b->sv); + respond_survey(url_hb,string{(char*)proc_info,(size_t)proc_info_len},&get<2>(*b)); const auto& url_pub_proxy = get_url(URLPubProxy); - publish(url_pub_proxy, NULL, 0, &b->pub); + publish(url_pub_proxy, NULL, 0, &get<0>(*b)); const auto& url_sub_queue = get_url(URLSubQueue); - subscribe_center(url_sub_queue, &b->sub); + subscribe_center(url_sub_queue, &get<1>(*b)); // temporary port = 0; - start_reply(b->proc_id, port, &b->rr); + start_reply(get<4>(*b), port, &get<3>(*b)); return b; } @@ -96,7 +90,7 @@ return false; } - if (b->proc_id.empty()) { + if (get<4>(*b).empty()) { PRNTVITAG("proc_id is null"); return false; } @@ -108,7 +102,7 @@ } bhome_msg::MsgTopicList mtl2; - mtl2.add_topic_list(b->proc_id); + mtl2.add_topic_list(get<4>(*b)); for(int i = 0; i < mtl.topic_list_size(); i++){ mtl2.add_topic_list(mtl.topic_list(i)); } @@ -214,7 +208,7 @@ } for(int i = 0; i < mtl.topic_list_size(); i ++){ - subscribe_topic(mtl.topic_list(i), &b->sub); + subscribe_topic(mtl.topic_list(i), &get<1>(*b)); } return true; @@ -250,7 +244,7 @@ PRNTVITAG("handle is null"); return false; } - if (b->proc_id.empty()){ + if (get<4>(*b).empty()){ PRNTVITAG("proc_id is null"); return false; } @@ -266,11 +260,11 @@ } MsgPublish newPub; - newPub.set_topic(b->proc_id); + newPub.set_topic(get<4>(*b)); newPub.set_data(string{(const char*)msgpub, (const size_t)msgpub_len}); string msg(newPub.SerializeAsString()); - auto ret = publish(pub.topic(), msg.data(), msg.size(), &b->pub); + auto ret = publish(pub.topic(), msg.data(), msg.size(), &get<0>(*b)); if (ret > 0) return true; return false; } @@ -289,7 +283,7 @@ } string topic, msg; - auto ret = subscribe_read(&topic, &msg, timeout_ms, &b->sub); + auto ret = subscribe_read(&topic, &msg, timeout_ms, &get<1>(*b)); if (ret < 0) return false; MsgPublish newPub; @@ -366,7 +360,7 @@ } string msg; - auto ret = read_request(src, &msg, timeout_ms, &b->rr); + auto ret = read_request(src, &msg, timeout_ms, &get<3>(*b)); if (ret != 0) return false; string procid{}; @@ -397,7 +391,7 @@ return false; } - auto ret = send_reply(src, reply, reply_len, &b->rr); + auto ret = send_reply(src, reply, reply_len, &get<3>(*b)); if (ret < 0) return false; return true; diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp index 1972e07..660f106 100644 --- a/src/nng_wrap.cpp +++ b/src/nng_wrap.cpp @@ -12,18 +12,6 @@ namespace nng_wrap { -// static int server_socket(const string& url, const int protocol, int family=AF_SP){ -// int sock = nn_socket(family, protocol); -// if (sock < 0) return sock; -// remove_exist(url); -// int rc = nn_bind(sock, url.c_str()); -// if (rc < 0) { -// nn_close(sock); -// return rc; -// } -// return sock; -// } - static int client_socket(const string& url, const int protocol, int family=AF_SP){ int sock = nn_socket(family, protocol); if (sock < 0) return sock; @@ -135,7 +123,7 @@ } set_socket_timeout(sock, timeout_req_rep); pub->socket_ = sock; - pub->t_ = thread([pub]{ + pub->t_ = get_thread([](const auto pub){ while (!pub->t_quit_.load()) { _ps::psmsg *msg{NULL}; { @@ -166,7 +154,7 @@ } } - }); + }, pub); return sock; } @@ -211,7 +199,7 @@ } // set_socket_timeout(sock, timeout_req_rep); sub->socket_ = sock; - sub->t_ = thread([sub]{ + sub->t_ = get_thread([](const auto sub){ while (!sub->t_quit_.load()) { char* m; int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT); @@ -223,9 +211,9 @@ lock_guard<mutex> l{sub->mtx_topics_}; for(auto && i : sub->topics_){ if (tmp_msg.size() < i.size()) continue; - topic = tmp_msg.substr(0, i.size()); + topic = move(tmp_msg.substr(0, i.size())); if (topic == i){ - msg = tmp_msg.substr(i.size()); + msg = move(tmp_msg.substr(i.size())); break; } } @@ -254,7 +242,7 @@ // printf("======>> subscribe nn_recv failed %s\n", nn_strerror(nn_errno())); } } - }); + }, sub); return 0; } @@ -331,7 +319,7 @@ sv->url_ = url; sv->fixed_msg_ = move(fixed_msg); - sv->t_ = thread([sv]{ + sv->t_ = get_thread([](const auto sv){ TAG; @@ -355,7 +343,7 @@ } } } - }); + }, sv); return 0; } diff --git a/src/req_rep.cpp b/src/req_rep.cpp index 8ac45be..967d586 100644 --- a/src/req_rep.cpp +++ b/src/req_rep.cpp @@ -88,10 +88,6 @@ lock_guard<mutex> l{rep->mtx_msg_}; rep->works_.emplace(rep->work_index_, w); rep->msg_.emplace(rep->work_index_, move(msg)); - // rep->works_.insert({rep->work_index_, w}); - // rep->msg_.insert({rep->work_index_, msg}); - // rep->works_[rep->work_index_] = w; - // rep->msg_[rep->work_index_] = msg; rep->work_index_++; rep->cv_msg_.notify_all(); } @@ -166,40 +162,42 @@ ipc = url; } rep->url_ = ipc; - if(create_server(&rep->sock_local_, ipc, 62, rep) != 0) return -1; + if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1; if (port > 0){ - rep->port_ = port; + get<1>(get<1>(rep->socks_)) = port; ipc = "tcp://0.0.0.0:" + to_string(port); - if(create_server(&rep->sock_remote_, ipc, 62, rep) != 0) return -1; + if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1; }else { - rep->sock_remote_.id = numeric_limits<int32_t>::max(); + get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max(); } if (!rep->t_unblock_){ - rep->t_unblock_.reset(new thread([rep]{ + rep->t_unblock_.reset(new thread(get_thread([](const auto rep){ constexpr int idle = 10; const auto data = rr_unblocking_msg_.data(); const auto data_size = rr_unblocking_msg_.size(); - while (!rep->t_quit_.load()) { - this_thread::sleep_for(chrono::milliseconds{10}); + auto f = [rep]{ vector<struct work*> tmp{}; - { - lock_guard<mutex> l{rep->mtx_msg_}; - for(auto iter = rep->works_.begin(); iter != rep->works_.end();){ - if ((iter->second+=idle) > timeout_req_rep){ - tmp.push_back(iter->second.w_); - iter = rep->works_.erase(iter); - }else { - ++iter; - } + lock_guard<mutex> l{rep->mtx_msg_}; + for(auto iter = rep->works_.begin(); iter != rep->works_.end();){ + if ((iter->second+=idle) > timeout_req_rep){ + tmp.push_back(iter->second.w_); + iter = rep->works_.erase(iter); + }else { + ++iter; } } + return tmp; + }; + while (!rep->t_quit_.load()) { + this_thread::sleep_for(chrono::milliseconds{10}); + vector<struct work*> tmp = f(); for(auto && w : tmp){ aio_unblock(w, data, data_size); } } - })); + }, rep))); } return 0; @@ -209,8 +207,8 @@ _rr* rep = (_rr*)arg; if (!rep) rep = singleton<_rr>(); - if (rep->sock_local_.id == 0 || rep->sock_remote_.id == 0) - if (start_reply(rep->url_, rep->port_) != 0) + if (get<0>(rep->socks_).id == 0 || get<0>(get<1>(rep->socks_)).id == 0) + if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0) return -1; int tm = to_ms > 0 ? to_ms : 30; -- Gitblit v1.8.0