zhangmeng
2024-04-22 16935f4aebffdd1b6580b844391a0aa0f4f3012b
src/common.h
@@ -16,35 +16,21 @@
#include <nng/nng.h>
namespace nng_wrap {
template <class... T> struct make_void{typedef void type;};
template <class... T> using void_t = typename make_void<T...>::type;
template <class T, typename = void> struct is_default_c : std::false_type{};
template <class T> struct is_default_c<T, void_t<decltype(T())>> : std::true_type{};
template<class T> using is_function_t = typename std::is_function<typename std::remove_pointer<typename std::remove_reference<T>::type>::type>::type;
template <bool, class T> struct is_callable_h : is_function_t<T>{};
template <class T> struct is_callable_h<true, T>{
private:
    struct FB{void operator()();};
    struct D : T, FB{};
    template<typename U, U> struct c;
    template<class> static std::true_type t(...);
    template<class C> static std::false_type t(c<void(FB::*)(), &C::operator()>*);
public:
    using type = decltype(t<D>(nullptr));
};
template <class T> using is_callable = typename is_callable_h<std::is_class<typename std::remove_reference<T>::type>::value, typename std::remove_reference<T>::type>::type;
static thread_local std::string verbose_info{};
#define TAG
#define PRNTVITAG(args)
/*
#ifndef PRNTVITAG
static thread_local std::string verbose_info{};
#define TAG do{ verbose_info.clear(); \
                verbose_info=string("function [")+__FUNCTION__+string("]"); \
            }while(0)
#define PRNTVITAG(msg) do{ \
            verbose_info+=string("-> (") + msg + string(")"); \
        }while(0)
// #define TAG
// #define PRNTVITAG(args)
#endif
*/
/////////////////////////////////////////////////
enum{
@@ -60,18 +46,18 @@
    URLHeartBeat,
};
static char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //进程注册
static char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //注销
static char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //注册主题
static char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //查询指定的主题
static char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //查询所有注册的进程
static char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //订阅本地主题
static char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //订阅网络主题
static char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc";
static char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc";   //这个是代理中心,用于接收待发布的消息
static char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc";  //这个是客户端从center订阅的通道
static const char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //进程注册
static const char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //注销
static const char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //注册主题
static const char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //查询指定的主题
static const char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //查询所有注册的进程
static const char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //订阅本地主题
static const char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //订阅网络主题
static const char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc";
static const char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc";   //这个是代理中心,用于接收待发布的消息
static const char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc";  //这个是客户端从center订阅的通道
static const std::unordered_map<int, std::string> map_url{
static const std::unordered_map<int, const char*> map_url{
    {URLReg,                IPC_REGISTER},
    {URLDeReg,              IPC_UNREGISTER},
    {URLRegTopic,           IPC_REGTOPIC},
@@ -83,15 +69,33 @@
    {URLSubQueue,           IPC_SUB_QUEUE},
    {URLHeartBeat,          IPC_HEARTBEAT},
};
inline std::string get_url(const int type){
inline const char* get_url(const int type){
    auto iter = map_url.find(type);
    if (iter != map_url.end()){
        return iter->second;
    }
    return {};
    return NULL;
}
static constexpr int timeout_req_rep = 5162;
template <class... T> struct make_void{typedef void type;};
template <class... T> using void_t = typename make_void<T...>::type;
template <class T, typename = void> struct is_default_c : std::false_type{};
template <class T> struct is_default_c<T, void_t<decltype(T()),decltype(std::declval<T>().operator()())>> : std::true_type{};
template<class T> using is_function_t = typename std::is_function<typename std::remove_pointer<typename std::remove_reference<T>::type>::type>::type;
template <bool, class T> struct is_callable_h : is_function_t<T>{};
template <class T> struct is_callable_h<true, T>{
private:
    struct FB{void operator()();};
    struct D : T, FB{};
    template<typename U, U> struct c;
    template<class> static std::true_type t(...);
    template<class C> static std::false_type t(c<void(FB::*)(), &C::operator()>*);
public:
    using type = decltype(t<D>(nullptr));
};
template <class T> using is_callable = typename is_callable_h<std::is_class<typename std::remove_reference<T>::type>::value, typename std::remove_reference<T>::type>::type;
static constexpr int timeout_req_rep = 6251;
inline void remove_exist(const std::string& url){
    if (url.find("ipc://") == 0){
@@ -129,15 +133,19 @@
    struct psmsg{
        DISABLE_COPY_AND_ASSIGN(psmsg);
        psmsg(const std::string& t, std::string&& m)
        :topic_(t),data_(std::move(m)){}
            :topic_(t),data_(std::move(m)){}
        psmsg(std::string&& t, std::string&& m)
            :topic_(std::move(t)),data_(std::move(m)){}
        std::string topic_{};
        std::string data_{};
    };
public:
    DISABLE_COPY_AND_ASSIGN(_ps);
    _ps()=default;
    int operator()(){return msg_.size();}
    virtual ~_ps(){
        t_quit_.store(true, std::memory_order_relaxed);
        cv_msg_.notify_all();
        if (t_.joinable()) t_.join();
    }
@@ -153,7 +161,7 @@
    DISABLE_COPY_AND_ASSIGN(_ps_sub);
    _ps_sub()=default;
    ~_ps_sub()=default;
    std::mutex& operator()(){return mtx_topics_;}
    std::unordered_set<std::string>     topics_{};
    std::mutex                          mtx_topics_{};
    std::unordered_set<std::string>     failed_topics_{};
@@ -164,6 +172,7 @@
public:
    DISABLE_COPY_AND_ASSIGN(_sv);
    _sv()=default;
    std::deque<std::string> operator()(){return {fixed_msg_};}
    ~_sv(){
        t_quit_.store(true, std::memory_order_relaxed);
        if (t_.joinable()) t_.join();
@@ -175,6 +184,7 @@
};
enum { INIT, RECV, WAIT, SEND };
enum { REPLY_IPC, REPLY_TCP };
struct work {
    int state{-1};
    nng_aio *aio{};
@@ -182,6 +192,7 @@
    nng_ctx  ctx{};
    void(*cb_recv)(work*){};
    void* user_data{};
    int mode{-1};
};
static const std::string rr_unblocking_msg_{"~!@#$%^&*()-=<<UNBLOCKING>>=-()*&^%$#@!~"};
@@ -189,6 +200,7 @@
public:
    DISABLE_COPY_AND_ASSIGN(_rr);
    _rr()=default;
    std::tuple<uint64_t&,std::unordered_map<uint64_t, std::string>&> operator()(){return std::tie(work_index_, msg_);}
    ~_rr(){
        if(std::get<0>(socks_).id > 0) nng_close(std::get<0>(socks_));
        if(std::get<0>(std::get<1>(socks_)).id > 0) nng_close(std::get<0>(std::get<1>(socks_)));
@@ -217,14 +229,13 @@
    std::mutex                                      mtx_msg_{};
    std::condition_variable                         cv_msg_{};
    // std::deque<void*>                               q_src_{};
};
template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0> inline T* singleton(){ static auto t = std::make_unique<T>(); return t.get(); }
template <class T, class... Args, typename std::enable_if<is_callable<T>::value, int>::type=0>
inline std::thread get_thread(T&& t, Args&&... args){
    return std::thread(std::forward<T>(t), std::forward<Args>(args)...);
}
template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0>
inline T* singleton(){ static auto t = std::make_unique<T>(); return t.get(); }
template <class T, class... Args, typename std::enable_if<is_callable<T>::value&&sizeof...(Args)==1, int>::type=0>
inline std::thread get_thread(T&& t, Args&&... args){return std::thread(std::forward<T>(t), std::forward<Args>(args)...);}
}
#endif