From aac0fe50f0ae9d13ff8ff7db2288a877b2fb2c53 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 17 十二月 2021 14:13:55 +0800
Subject: [PATCH] bug fixed
---
src/nng_wrap.cpp | 28 ++------
src/req_rep.cpp | 44 +++++++-------
src/common.h | 40 +++++++++----
src/interface_bus_api.cpp | 38 +++++-------
4 files changed, 72 insertions(+), 78 deletions(-)
diff --git a/src/common.h b/src/common.h
index 1a9a826..db4761d 100644
--- a/src/common.h
+++ b/src/common.h
@@ -9,12 +9,30 @@
#include <unordered_map>
#include <mutex>
#include <condition_variable>
+#include <tuple>
#include <unistd.h>
#include "nng/compat/nanomsg/nn.h"
#include <nng/nng.h>
namespace nng_wrap {
+template <class... T> struct make_void{typedef void type;};
+template <class... T> using void_t = typename make_void<T...>::type;
+template <class T, typename = void> struct is_default_c : std::false_type{};
+template <class T> struct is_default_c<T, void_t<decltype(T())>> : std::true_type{};
+template<class T> using is_function_t = typename std::is_function<typename std::remove_pointer<typename std::remove_reference<T>::type>::type>::type;
+template <bool, class T> struct is_callable_h : is_function_t<T>{};
+template <class T> struct is_callable_h<true, T>{
+private:
+ struct FB{void operator()();};
+ struct D : T, FB{};
+ template<typename U, U> struct c;
+ template<class> static std::true_type t(...);
+ template<class C> static std::false_type t(c<void(FB::*)(), &C::operator()>*);
+public:
+ using type = decltype(t<D>(nullptr));
+};
+template <class T> using is_callable = typename is_callable_h<std::is_class<typename std::remove_reference<T>::type>::value, typename std::remove_reference<T>::type>::type;
static thread_local std::string verbose_info{};
#ifndef PRNTVITAG
@@ -110,7 +128,6 @@
public:
struct psmsg{
DISABLE_COPY_AND_ASSIGN(psmsg);
- psmsg()=delete;
psmsg(const std::string& t, std::string&& m)
:topic_(t),data_(std::move(m)){}
std::string topic_{};
@@ -173,31 +190,23 @@
DISABLE_COPY_AND_ASSIGN(_rr);
_rr()=default;
~_rr(){
- if(sock_local_.id > 0) nng_close(sock_local_);
- if(sock_remote_.id > 0) nng_close(sock_remote_);
+ if(std::get<0>(socks_).id > 0) nng_close(std::get<0>(socks_));
+ if(std::get<0>(std::get<1>(socks_)).id > 0) nng_close(std::get<0>(std::get<1>(socks_)));
t_quit_.store(true, std::memory_order_relaxed);
if (t_unblock_&&t_unblock_->joinable()) t_unblock_->join();
}
-
std::unique_ptr<std::thread> t_unblock_{nullptr};
std::atomic_bool t_quit_{false};
- nng_socket sock_local_{0};
- nng_socket sock_remote_{0};
- int port_{-1};
+ std::tuple<nng_socket, std::tuple<nng_socket, int>> socks_;
std::unordered_map<uint64_t, std::string> msg_{};
class worker{
worker& in_op(const worker& w){if(&w!=this){w_=w.w_;life_=w.life_;}return *this;};
public:
- worker()=default;
- ~worker()=default;
worker(struct work* w):w_(w),life_(0){}
- worker(const worker& w):w_(w.w_),life_(w.life_){}
worker(worker&& w):w_(w.w_),life_(w.life_){}
- worker& operator=(const worker& w){return in_op(w);}
- worker& operator=(worker&& w){return in_op(w);}
operator struct work*() const{return w_;}
operator int&() {return life_;}
struct work* w_{};
@@ -210,7 +219,12 @@
};
-template<class T> inline T* singleton(){ static T t; return &t; }
+template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0> inline T* singleton(){ static auto t = std::make_unique<T>(); return t.get(); }
+
+template <class T, class... Args, typename std::enable_if<is_callable<T>::value, int>::type=0>
+inline std::thread get_thread(T&& t, Args&&... args){
+ return std::thread(std::forward<T>(t), std::forward<Args>(args)...);
+}
}
#endif
\ No newline at end of file
diff --git a/src/interface_bus_api.cpp b/src/interface_bus_api.cpp
index 20a4659..a23dd79 100644
--- a/src/interface_bus_api.cpp
+++ b/src/interface_bus_api.cpp
@@ -3,20 +3,14 @@
using namespace nng_wrap;
#include "common.h"
+#include <tuple>
using namespace std;
#include "bhome_msg.pb.h"
#include "bhome_msg_api.pb.h"
using namespace bhome_msg;
-struct bus{
- _ps pub{};
- _ps_sub sub{};
- _sv sv{};
- _rr rr{};
- string proc_id{};
-};
-
+using bus = tuple<_ps, _ps_sub, _sv, _rr, string>;
void* bus_register(const void *proc_info,
const int proc_info_len,
void **reply,
@@ -36,19 +30,19 @@
/////////////////////////////////////////////////////////////////////////
bus *b = new bus;
bhome_msg::ProcInfo pi;
- if (pi.ParseFromArray(proc_info, proc_info_len)) b->proc_id = pi.proc_id();
+ if (pi.ParseFromArray(proc_info, proc_info_len)) get<4>(*b) = pi.proc_id();
const auto& url_hb = get_url(URLHeartBeat);
- respond_survey(url_hb,string{(char*)proc_info,(size_t)proc_info_len},&b->sv);
+ respond_survey(url_hb,string{(char*)proc_info,(size_t)proc_info_len},&get<2>(*b));
const auto& url_pub_proxy = get_url(URLPubProxy);
- publish(url_pub_proxy, NULL, 0, &b->pub);
+ publish(url_pub_proxy, NULL, 0, &get<0>(*b));
const auto& url_sub_queue = get_url(URLSubQueue);
- subscribe_center(url_sub_queue, &b->sub);
+ subscribe_center(url_sub_queue, &get<1>(*b));
// temporary
port = 0;
- start_reply(b->proc_id, port, &b->rr);
+ start_reply(get<4>(*b), port, &get<3>(*b));
return b;
}
@@ -96,7 +90,7 @@
return false;
}
- if (b->proc_id.empty()) {
+ if (get<4>(*b).empty()) {
PRNTVITAG("proc_id is null");
return false;
}
@@ -108,7 +102,7 @@
}
bhome_msg::MsgTopicList mtl2;
- mtl2.add_topic_list(b->proc_id);
+ mtl2.add_topic_list(get<4>(*b));
for(int i = 0; i < mtl.topic_list_size(); i++){
mtl2.add_topic_list(mtl.topic_list(i));
}
@@ -214,7 +208,7 @@
}
for(int i = 0; i < mtl.topic_list_size(); i ++){
- subscribe_topic(mtl.topic_list(i), &b->sub);
+ subscribe_topic(mtl.topic_list(i), &get<1>(*b));
}
return true;
@@ -250,7 +244,7 @@
PRNTVITAG("handle is null");
return false;
}
- if (b->proc_id.empty()){
+ if (get<4>(*b).empty()){
PRNTVITAG("proc_id is null");
return false;
}
@@ -266,11 +260,11 @@
}
MsgPublish newPub;
- newPub.set_topic(b->proc_id);
+ newPub.set_topic(get<4>(*b));
newPub.set_data(string{(const char*)msgpub, (const size_t)msgpub_len});
string msg(newPub.SerializeAsString());
- auto ret = publish(pub.topic(), msg.data(), msg.size(), &b->pub);
+ auto ret = publish(pub.topic(), msg.data(), msg.size(), &get<0>(*b));
if (ret > 0) return true;
return false;
}
@@ -289,7 +283,7 @@
}
string topic, msg;
- auto ret = subscribe_read(&topic, &msg, timeout_ms, &b->sub);
+ auto ret = subscribe_read(&topic, &msg, timeout_ms, &get<1>(*b));
if (ret < 0) return false;
MsgPublish newPub;
@@ -366,7 +360,7 @@
}
string msg;
- auto ret = read_request(src, &msg, timeout_ms, &b->rr);
+ auto ret = read_request(src, &msg, timeout_ms, &get<3>(*b));
if (ret != 0) return false;
string procid{};
@@ -397,7 +391,7 @@
return false;
}
- auto ret = send_reply(src, reply, reply_len, &b->rr);
+ auto ret = send_reply(src, reply, reply_len, &get<3>(*b));
if (ret < 0) return false;
return true;
diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp
index 1972e07..660f106 100644
--- a/src/nng_wrap.cpp
+++ b/src/nng_wrap.cpp
@@ -12,18 +12,6 @@
namespace nng_wrap {
-// static int server_socket(const string& url, const int protocol, int family=AF_SP){
-// int sock = nn_socket(family, protocol);
-// if (sock < 0) return sock;
-// remove_exist(url);
-// int rc = nn_bind(sock, url.c_str());
-// if (rc < 0) {
-// nn_close(sock);
-// return rc;
-// }
-// return sock;
-// }
-
static int client_socket(const string& url, const int protocol, int family=AF_SP){
int sock = nn_socket(family, protocol);
if (sock < 0) return sock;
@@ -135,7 +123,7 @@
}
set_socket_timeout(sock, timeout_req_rep);
pub->socket_ = sock;
- pub->t_ = thread([pub]{
+ pub->t_ = get_thread([](const auto pub){
while (!pub->t_quit_.load()) {
_ps::psmsg *msg{NULL};
{
@@ -166,7 +154,7 @@
}
}
- });
+ }, pub);
return sock;
}
@@ -211,7 +199,7 @@
}
// set_socket_timeout(sock, timeout_req_rep);
sub->socket_ = sock;
- sub->t_ = thread([sub]{
+ sub->t_ = get_thread([](const auto sub){
while (!sub->t_quit_.load()) {
char* m;
int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT);
@@ -223,9 +211,9 @@
lock_guard<mutex> l{sub->mtx_topics_};
for(auto && i : sub->topics_){
if (tmp_msg.size() < i.size()) continue;
- topic = tmp_msg.substr(0, i.size());
+ topic = move(tmp_msg.substr(0, i.size()));
if (topic == i){
- msg = tmp_msg.substr(i.size());
+ msg = move(tmp_msg.substr(i.size()));
break;
}
}
@@ -254,7 +242,7 @@
// printf("======>> subscribe nn_recv failed %s\n", nn_strerror(nn_errno()));
}
}
- });
+ }, sub);
return 0;
}
@@ -331,7 +319,7 @@
sv->url_ = url;
sv->fixed_msg_ = move(fixed_msg);
- sv->t_ = thread([sv]{
+ sv->t_ = get_thread([](const auto sv){
TAG;
@@ -355,7 +343,7 @@
}
}
}
- });
+ }, sv);
return 0;
}
diff --git a/src/req_rep.cpp b/src/req_rep.cpp
index 8ac45be..967d586 100644
--- a/src/req_rep.cpp
+++ b/src/req_rep.cpp
@@ -88,10 +88,6 @@
lock_guard<mutex> l{rep->mtx_msg_};
rep->works_.emplace(rep->work_index_, w);
rep->msg_.emplace(rep->work_index_, move(msg));
- // rep->works_.insert({rep->work_index_, w});
- // rep->msg_.insert({rep->work_index_, msg});
- // rep->works_[rep->work_index_] = w;
- // rep->msg_[rep->work_index_] = msg;
rep->work_index_++;
rep->cv_msg_.notify_all();
}
@@ -166,40 +162,42 @@
ipc = url;
}
rep->url_ = ipc;
- if(create_server(&rep->sock_local_, ipc, 62, rep) != 0) return -1;
+ if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1;
if (port > 0){
- rep->port_ = port;
+ get<1>(get<1>(rep->socks_)) = port;
ipc = "tcp://0.0.0.0:" + to_string(port);
- if(create_server(&rep->sock_remote_, ipc, 62, rep) != 0) return -1;
+ if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
}else {
- rep->sock_remote_.id = numeric_limits<int32_t>::max();
+ get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
}
if (!rep->t_unblock_){
- rep->t_unblock_.reset(new thread([rep]{
+ rep->t_unblock_.reset(new thread(get_thread([](const auto rep){
constexpr int idle = 10;
const auto data = rr_unblocking_msg_.data();
const auto data_size = rr_unblocking_msg_.size();
- while (!rep->t_quit_.load()) {
- this_thread::sleep_for(chrono::milliseconds{10});
+ auto f = [rep]{
vector<struct work*> tmp{};
- {
- lock_guard<mutex> l{rep->mtx_msg_};
- for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
- if ((iter->second+=idle) > timeout_req_rep){
- tmp.push_back(iter->second.w_);
- iter = rep->works_.erase(iter);
- }else {
- ++iter;
- }
+ lock_guard<mutex> l{rep->mtx_msg_};
+ for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
+ if ((iter->second+=idle) > timeout_req_rep){
+ tmp.push_back(iter->second.w_);
+ iter = rep->works_.erase(iter);
+ }else {
+ ++iter;
}
}
+ return tmp;
+ };
+ while (!rep->t_quit_.load()) {
+ this_thread::sleep_for(chrono::milliseconds{10});
+ vector<struct work*> tmp = f();
for(auto && w : tmp){
aio_unblock(w, data, data_size);
}
}
- }));
+ }, rep)));
}
return 0;
@@ -209,8 +207,8 @@
_rr* rep = (_rr*)arg;
if (!rep) rep = singleton<_rr>();
- if (rep->sock_local_.id == 0 || rep->sock_remote_.id == 0)
- if (start_reply(rep->url_, rep->port_) != 0)
+ if (get<0>(rep->socks_).id == 0 || get<0>(get<1>(rep->socks_)).id == 0)
+ if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0)
return -1;
int tm = to_ms > 0 ? to_ms : 30;
--
Gitblit v1.8.0