zhangmeng
2021-12-21 e9984ced808cdd0be956630e25a431853c91e478
performance
3个文件已修改
71 ■■■■ 已修改文件
src/common.h 50 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/nng_wrap.cpp 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/req_rep.cpp 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/common.h
@@ -16,23 +16,6 @@
#include <nng/nng.h>
namespace nng_wrap {
template <class... T> struct make_void{typedef void type;};
template <class... T> using void_t = typename make_void<T...>::type;
template <class T, typename = void> struct is_default_c : std::false_type{};
template <class T> struct is_default_c<T, void_t<decltype(T())>> : std::true_type{};
template<class T> using is_function_t = typename std::is_function<typename std::remove_pointer<typename std::remove_reference<T>::type>::type>::type;
template <bool, class T> struct is_callable_h : is_function_t<T>{};
template <class T> struct is_callable_h<true, T>{
private:
    struct FB{void operator()();};
    struct D : T, FB{};
    template<typename U, U> struct c;
    template<class> static std::true_type t(...);
    template<class C> static std::false_type t(c<void(FB::*)(), &C::operator()>*);
public:
    using type = decltype(t<D>(nullptr));
};
template <class T> using is_callable = typename is_callable_h<std::is_class<typename std::remove_reference<T>::type>::value, typename std::remove_reference<T>::type>::type;
static thread_local std::string verbose_info{};
#ifndef PRNTVITAG
@@ -91,6 +74,24 @@
    return {};
}
template <class... T> struct make_void{typedef void type;};
template <class... T> using void_t = typename make_void<T...>::type;
template <class T, typename = void> struct is_default_c : std::false_type{};
template <class T> struct is_default_c<T, void_t<decltype(T()),decltype(std::declval<T>().operator()())>> : std::true_type{};
template<class T> using is_function_t = typename std::is_function<typename std::remove_pointer<typename std::remove_reference<T>::type>::type>::type;
template <bool, class T> struct is_callable_h : is_function_t<T>{};
template <class T> struct is_callable_h<true, T>{
private:
    struct FB{void operator()();};
    struct D : T, FB{};
    template<typename U, U> struct c;
    template<class> static std::true_type t(...);
    template<class C> static std::false_type t(c<void(FB::*)(), &C::operator()>*);
public:
    using type = decltype(t<D>(nullptr));
};
template <class T> using is_callable = typename is_callable_h<std::is_class<typename std::remove_reference<T>::type>::value, typename std::remove_reference<T>::type>::type;
static constexpr int timeout_req_rep = 5162;
inline void remove_exist(const std::string& url){
@@ -136,6 +137,7 @@
public:
    DISABLE_COPY_AND_ASSIGN(_ps);
    _ps()=default;
    int operator()(){return msg_.size();}
    virtual ~_ps(){
        t_quit_.store(true, std::memory_order_relaxed);
        if (t_.joinable()) t_.join();
@@ -153,7 +155,7 @@
    DISABLE_COPY_AND_ASSIGN(_ps_sub);
    _ps_sub()=default;
    ~_ps_sub()=default;
    std::mutex& operator()(){return mtx_topics_;}
    std::unordered_set<std::string>     topics_{};
    std::mutex                          mtx_topics_{};
    std::unordered_set<std::string>     failed_topics_{};
@@ -164,6 +166,7 @@
public:
    DISABLE_COPY_AND_ASSIGN(_sv);
    _sv()=default;
    std::deque<std::string> operator()(){return {fixed_msg_};}
    ~_sv(){
        t_quit_.store(true, std::memory_order_relaxed);
        if (t_.joinable()) t_.join();
@@ -189,6 +192,7 @@
public:
    DISABLE_COPY_AND_ASSIGN(_rr);
    _rr()=default;
    std::tuple<uint64_t&,std::unordered_map<uint64_t, std::string>&> operator()(){return std::tie(work_index_, msg_);}
    ~_rr(){
        if(std::get<0>(socks_).id > 0) nng_close(std::get<0>(socks_));
        if(std::get<0>(std::get<1>(socks_)).id > 0) nng_close(std::get<0>(std::get<1>(socks_)));
@@ -219,12 +223,10 @@
};
template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0> inline T* singleton(){ static auto t = std::make_unique<T>(); return t.get(); }
template <class T, class... Args, typename std::enable_if<is_callable<T>::value, int>::type=0>
inline std::thread get_thread(T&& t, Args&&... args){
    return std::thread(std::forward<T>(t), std::forward<Args>(args)...);
}
template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0>
inline T* singleton(){ static auto t = std::make_unique<T>(); return t.get(); }
template <class T, class... Args, typename std::enable_if<is_callable<T>::value&&sizeof...(Args)==1, int>::type=0>
inline std::thread get_thread(T&& t, Args&&... args){return std::thread(std::forward<T>(t), std::forward<Args>(args)...);}
}
#endif
src/nng_wrap.cpp
@@ -178,7 +178,7 @@
    lock_guard<mutex> l{pub->mtx_msg_};
    pub->msg_.emplace_back(topic, string{(const char*)data, (const size_t)data_len});
    pub->cv_msg_.notify_one();
    return pub->msg_.size();
    return (*pub)();
}
///////////////////////////////////////////////
@@ -208,7 +208,7 @@
                nn_freemsg(m);
                string topic{}, msg{};
                {
                    lock_guard<mutex> l{sub->mtx_topics_};
                    lock_guard<mutex> l{(*sub)()};
                    for(auto && i : sub->topics_){
                        if (tmp_msg.size() < i.size()) continue;
                        topic = move(tmp_msg.substr(0, i.size()));
@@ -266,7 +266,7 @@
        lock_guard<mutex> l{sub->mtx_failed_topics_};
        sub->failed_topics_.insert(topic);
    }
    lock_guard<mutex> l{sub->mtx_topics_};
    lock_guard<mutex> l{(*sub)()};
    sub->topics_.insert(topic);
    return 0;
@@ -276,7 +276,7 @@
    _ps_sub* sub = (_ps_sub*)arg;
    if (!sub) sub = singleton<_ps_sub>();
    lock_guard<mutex> l(sub->mtx_topics_);
    lock_guard<mutex> l{(*sub)()};
    auto iter = sub->topics_.find(topic);
    if (iter != sub->topics_.end()){
        nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_UNSUBSCRIBE, topic.c_str(), topic.length());
@@ -324,7 +324,7 @@
        TAG;
        int& sock = sv->socket_;
        const auto& msg = sv->fixed_msg_;
        while (!sv->t_quit_.load()) {
            if (sock < 0){
                sock = client_socket(sv->url_, NN_RESPONDENT);
@@ -336,7 +336,7 @@
            int rc = nn_recv(sock, &tmp, NN_MSG, 0);
            if (rc > 0){
                nn_freemsg(tmp);
                rc = nn_send(sock, msg.data(), msg.size(), 0);
                rc = nn_send(sock, (*sv)().front().data(), (*sv)().front().size(), 0);
                if (rc < 0){
                    PRNTVITAG("heartbeat survey failed");
                    PRNTVITAG(nn_strerror(nn_errno()));
src/req_rep.cpp
@@ -85,10 +85,11 @@
    string msg{(const char*)nng_msg_body(om), nng_msg_len(om)};
    nng_msg_free(om);
    auto t = (*rep)();
    lock_guard<mutex> l{rep->mtx_msg_};
    rep->works_.emplace(rep->work_index_, w);
    rep->msg_.emplace(rep->work_index_, move(msg));
    rep->work_index_++;
    rep->works_.emplace(get<0>(t), w);
    get<1>(t).emplace(get<0>(t), move(msg));
    get<0>(t)++;
    rep->cv_msg_.notify_all();
}
@@ -124,7 +125,7 @@
        return rv;
    }
    work** works = (work**)malloc(sizeof(work*) * count);
    work** works = (work**)malloc(sizeof(void*) * count);
    for (int i = 0; i < count; i++) {
        works[i] = alloc_work(*sock, rep);
    }