zhangmeng
2021-12-17 aac0fe50f0ae9d13ff8ff7db2288a877b2fb2c53
bug fixed
4个文件已修改
150 ■■■■ 已修改文件
src/common.h 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/interface_bus_api.cpp 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/nng_wrap.cpp 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/req_rep.cpp 44 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/common.h
@@ -9,12 +9,30 @@
#include <unordered_map>
#include <mutex>
#include <condition_variable>
#include <tuple>
#include <unistd.h>
#include "nng/compat/nanomsg/nn.h"
#include <nng/nng.h>
namespace nng_wrap {
template <class... T> struct make_void{typedef void type;};
template <class... T> using void_t = typename make_void<T...>::type;
template <class T, typename = void> struct is_default_c : std::false_type{};
template <class T> struct is_default_c<T, void_t<decltype(T())>> : std::true_type{};
template<class T> using is_function_t = typename std::is_function<typename std::remove_pointer<typename std::remove_reference<T>::type>::type>::type;
template <bool, class T> struct is_callable_h : is_function_t<T>{};
template <class T> struct is_callable_h<true, T>{
private:
    struct FB{void operator()();};
    struct D : T, FB{};
    template<typename U, U> struct c;
    template<class> static std::true_type t(...);
    template<class C> static std::false_type t(c<void(FB::*)(), &C::operator()>*);
public:
    using type = decltype(t<D>(nullptr));
};
template <class T> using is_callable = typename is_callable_h<std::is_class<typename std::remove_reference<T>::type>::value, typename std::remove_reference<T>::type>::type;
static thread_local std::string verbose_info{};
#ifndef PRNTVITAG
@@ -110,7 +128,6 @@
public:
    struct psmsg{
        DISABLE_COPY_AND_ASSIGN(psmsg);
        psmsg()=delete;
        psmsg(const std::string& t, std::string&& m)
        :topic_(t),data_(std::move(m)){}
        std::string topic_{};
@@ -173,31 +190,23 @@
    DISABLE_COPY_AND_ASSIGN(_rr);
    _rr()=default;
    ~_rr(){
        if(sock_local_.id > 0) nng_close(sock_local_);
        if(sock_remote_.id > 0) nng_close(sock_remote_);
        if(std::get<0>(socks_).id > 0) nng_close(std::get<0>(socks_));
        if(std::get<0>(std::get<1>(socks_)).id > 0) nng_close(std::get<0>(std::get<1>(socks_)));
        t_quit_.store(true, std::memory_order_relaxed);
        if (t_unblock_&&t_unblock_->joinable()) t_unblock_->join();
    }
    std::unique_ptr<std::thread>                    t_unblock_{nullptr};
    std::atomic_bool                                t_quit_{false};
    nng_socket                                      sock_local_{0};
    nng_socket                                      sock_remote_{0};
    int                                             port_{-1};
    std::tuple<nng_socket, std::tuple<nng_socket, int>> socks_;
    std::unordered_map<uint64_t, std::string>       msg_{};
    class worker{
        worker& in_op(const worker& w){if(&w!=this){w_=w.w_;life_=w.life_;}return *this;};
    public:
        worker()=default;
        ~worker()=default;
        worker(struct work* w):w_(w),life_(0){}
        worker(const worker& w):w_(w.w_),life_(w.life_){}
        worker(worker&& w):w_(w.w_),life_(w.life_){}
        worker& operator=(const worker& w){return in_op(w);}
        worker& operator=(worker&& w){return in_op(w);}
        operator struct work*() const{return w_;}
        operator int&() {return life_;}
        struct work* w_{};
@@ -210,7 +219,12 @@
};
template<class T> inline T* singleton(){ static T t; return &t; }
template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0> inline T* singleton(){ static auto t = std::make_unique<T>(); return t.get(); }
template <class T, class... Args, typename std::enable_if<is_callable<T>::value, int>::type=0>
inline std::thread get_thread(T&& t, Args&&... args){
    return std::thread(std::forward<T>(t), std::forward<Args>(args)...);
}
}
#endif
src/interface_bus_api.cpp
@@ -3,20 +3,14 @@
using namespace nng_wrap;
#include "common.h"
#include <tuple>
using namespace std;
#include "bhome_msg.pb.h"
#include "bhome_msg_api.pb.h"
using namespace bhome_msg;
struct bus{
    _ps         pub{};
    _ps_sub     sub{};
    _sv         sv{};
    _rr         rr{};
    string      proc_id{};
};
using bus = tuple<_ps, _ps_sub, _sv, _rr, string>;
void* bus_register(const void *proc_info,
               const int proc_info_len,
               void **reply,
@@ -36,19 +30,19 @@
/////////////////////////////////////////////////////////////////////////
        bus *b = new bus;
        bhome_msg::ProcInfo pi;
        if (pi.ParseFromArray(proc_info, proc_info_len)) b->proc_id = pi.proc_id();
        if (pi.ParseFromArray(proc_info, proc_info_len)) get<4>(*b) = pi.proc_id();
        const auto& url_hb = get_url(URLHeartBeat);
        respond_survey(url_hb,string{(char*)proc_info,(size_t)proc_info_len},&b->sv);
        respond_survey(url_hb,string{(char*)proc_info,(size_t)proc_info_len},&get<2>(*b));
        const auto& url_pub_proxy = get_url(URLPubProxy);
        publish(url_pub_proxy, NULL, 0, &b->pub);
        publish(url_pub_proxy, NULL, 0, &get<0>(*b));
        const auto& url_sub_queue = get_url(URLSubQueue);
        subscribe_center(url_sub_queue, &b->sub);
        subscribe_center(url_sub_queue, &get<1>(*b));
        // temporary
        port = 0;
        start_reply(b->proc_id, port, &b->rr);
        start_reply(get<4>(*b), port, &get<3>(*b));
        return b;
    }
@@ -96,7 +90,7 @@
        return false;
    }
    if (b->proc_id.empty()) {
    if (get<4>(*b).empty()) {
        PRNTVITAG("proc_id is null");
        return false;
    }
@@ -108,7 +102,7 @@
    }
    bhome_msg::MsgTopicList mtl2;
    mtl2.add_topic_list(b->proc_id);
    mtl2.add_topic_list(get<4>(*b));
    for(int i = 0; i < mtl.topic_list_size(); i++){
        mtl2.add_topic_list(mtl.topic_list(i));
    }
@@ -214,7 +208,7 @@
    }
    for(int i = 0; i < mtl.topic_list_size(); i ++){
        subscribe_topic(mtl.topic_list(i), &b->sub);
        subscribe_topic(mtl.topic_list(i), &get<1>(*b));
    }
    return true;
@@ -250,7 +244,7 @@
        PRNTVITAG("handle is null");
        return false;
    }
    if (b->proc_id.empty()){
    if (get<4>(*b).empty()){
        PRNTVITAG("proc_id is null");
        return false;
    }
@@ -266,11 +260,11 @@
    }
    MsgPublish newPub;
    newPub.set_topic(b->proc_id);
    newPub.set_topic(get<4>(*b));
    newPub.set_data(string{(const char*)msgpub, (const size_t)msgpub_len});
    string msg(newPub.SerializeAsString());
    auto ret = publish(pub.topic(), msg.data(), msg.size(), &b->pub);
    auto ret = publish(pub.topic(), msg.data(), msg.size(), &get<0>(*b));
    if (ret > 0) return true;
    return false;
}
@@ -289,7 +283,7 @@
    }
    string topic, msg;
    auto ret = subscribe_read(&topic, &msg, timeout_ms, &b->sub);
    auto ret = subscribe_read(&topic, &msg, timeout_ms, &get<1>(*b));
    if (ret < 0) return false;
    MsgPublish newPub;
@@ -366,7 +360,7 @@
    }
    string msg;
    auto ret = read_request(src, &msg, timeout_ms, &b->rr);
    auto ret = read_request(src, &msg, timeout_ms, &get<3>(*b));
    if (ret != 0) return false;
    string procid{};
@@ -397,7 +391,7 @@
        return false;
    }
    auto ret = send_reply(src, reply, reply_len, &b->rr);
    auto ret = send_reply(src, reply, reply_len, &get<3>(*b));
    if (ret < 0) return false;
    return true;
src/nng_wrap.cpp
@@ -12,18 +12,6 @@
namespace nng_wrap {
// static int server_socket(const string& url, const int protocol, int family=AF_SP){
//     int sock = nn_socket(family, protocol);
//     if (sock < 0) return sock;
//     remove_exist(url);
//     int rc = nn_bind(sock, url.c_str());
//     if (rc < 0) {
//         nn_close(sock);
//         return rc;
//     }
//     return sock;
// }
static int client_socket(const string& url, const int protocol, int family=AF_SP){
    int sock = nn_socket(family, protocol);
    if (sock < 0) return sock;
@@ -135,7 +123,7 @@
    }
    set_socket_timeout(sock, timeout_req_rep);
    pub->socket_ = sock;
    pub->t_ = thread([pub]{
    pub->t_ = get_thread([](const auto pub){
        while (!pub->t_quit_.load()) {
            _ps::psmsg *msg{NULL};
            {
@@ -166,7 +154,7 @@
            }
        }
    });
    }, pub);
    return sock;
}
@@ -211,7 +199,7 @@
    }
    // set_socket_timeout(sock, timeout_req_rep);
    sub->socket_ = sock;
    sub->t_ = thread([sub]{
    sub->t_ = get_thread([](const auto sub){
        while (!sub->t_quit_.load()) {
            char* m;
            int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT);
@@ -223,9 +211,9 @@
                    lock_guard<mutex> l{sub->mtx_topics_};
                    for(auto && i : sub->topics_){
                        if (tmp_msg.size() < i.size()) continue;
                        topic = tmp_msg.substr(0, i.size());
                        topic = move(tmp_msg.substr(0, i.size()));
                        if (topic == i){
                            msg = tmp_msg.substr(i.size());
                            msg = move(tmp_msg.substr(i.size()));
                            break;
                        }
                    }
@@ -254,7 +242,7 @@
                // printf("======>> subscribe nn_recv failed %s\n", nn_strerror(nn_errno()));
            }
        }
    });
    }, sub);
    return 0;
}
@@ -331,7 +319,7 @@
    sv->url_ = url;
    sv->fixed_msg_ = move(fixed_msg);
    sv->t_ = thread([sv]{
    sv->t_ = get_thread([](const auto sv){
        TAG;
@@ -355,7 +343,7 @@
                }
            }
        }
    });
    }, sv);
    return 0;
}
src/req_rep.cpp
@@ -88,10 +88,6 @@
    lock_guard<mutex> l{rep->mtx_msg_};
    rep->works_.emplace(rep->work_index_, w);
    rep->msg_.emplace(rep->work_index_, move(msg));
    // rep->works_.insert({rep->work_index_, w});
    // rep->msg_.insert({rep->work_index_, msg});
    // rep->works_[rep->work_index_] = w;
    // rep->msg_[rep->work_index_] = msg;
    rep->work_index_++;
    rep->cv_msg_.notify_all();
}
@@ -166,40 +162,42 @@
        ipc = url;
    }
    rep->url_ = ipc;
    if(create_server(&rep->sock_local_, ipc, 62, rep) != 0) return -1;
    if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1;
    if (port > 0){
        rep->port_ = port;
        get<1>(get<1>(rep->socks_)) = port;
        ipc = "tcp://0.0.0.0:" + to_string(port);
        if(create_server(&rep->sock_remote_, ipc, 62, rep) != 0) return -1;
        if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
    }else {
        rep->sock_remote_.id = numeric_limits<int32_t>::max();
        get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
    }
    if (!rep->t_unblock_){
        rep->t_unblock_.reset(new thread([rep]{
        rep->t_unblock_.reset(new thread(get_thread([](const auto rep){
            constexpr int idle = 10;
            const auto data = rr_unblocking_msg_.data();
            const auto data_size = rr_unblocking_msg_.size();
            while (!rep->t_quit_.load()) {
                this_thread::sleep_for(chrono::milliseconds{10});
            auto f = [rep]{
                vector<struct work*> tmp{};
                {
                    lock_guard<mutex> l{rep->mtx_msg_};
                    for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
                        if ((iter->second+=idle) > timeout_req_rep){
                            tmp.push_back(iter->second.w_);
                            iter = rep->works_.erase(iter);
                        }else {
                            ++iter;
                        }
                lock_guard<mutex> l{rep->mtx_msg_};
                for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
                    if ((iter->second+=idle) > timeout_req_rep){
                        tmp.push_back(iter->second.w_);
                        iter = rep->works_.erase(iter);
                    }else {
                        ++iter;
                    }
                }
                return tmp;
            };
            while (!rep->t_quit_.load()) {
                this_thread::sleep_for(chrono::milliseconds{10});
                vector<struct work*> tmp = f();
                for(auto && w : tmp){
                    aio_unblock(w, data, data_size);
                }
            }
        }));
        }, rep)));
    }
    return 0;
@@ -209,8 +207,8 @@
    _rr* rep = (_rr*)arg;
    if (!rep) rep = singleton<_rr>();
    if (rep->sock_local_.id == 0 || rep->sock_remote_.id == 0)
        if (start_reply(rep->url_, rep->port_) != 0)
    if (get<0>(rep->socks_).id == 0 || get<0>(get<1>(rep->socks_)).id == 0)
        if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0)
            return -1;
    int tm = to_ms > 0 ? to_ms : 30;