| | |
| | | #include <nng/nng.h> |
| | | |
| | | namespace nng_wrap { |
| | | template <class... T> struct make_void{typedef void type;}; |
| | | template <class... T> using void_t = typename make_void<T...>::type; |
| | | template <class T, typename = void> struct is_default_c : std::false_type{}; |
| | | template <class T> struct is_default_c<T, void_t<decltype(T())>> : std::true_type{}; |
| | | template<class T> using is_function_t = typename std::is_function<typename std::remove_pointer<typename std::remove_reference<T>::type>::type>::type; |
| | | template <bool, class T> struct is_callable_h : is_function_t<T>{}; |
| | | template <class T> struct is_callable_h<true, T>{ |
| | | private: |
| | | struct FB{void operator()();}; |
| | | struct D : T, FB{}; |
| | | template<typename U, U> struct c; |
| | | template<class> static std::true_type t(...); |
| | | template<class C> static std::false_type t(c<void(FB::*)(), &C::operator()>*); |
| | | public: |
| | | using type = decltype(t<D>(nullptr)); |
| | | }; |
| | | template <class T> using is_callable = typename is_callable_h<std::is_class<typename std::remove_reference<T>::type>::value, typename std::remove_reference<T>::type>::type; |
| | | |
| | | static thread_local std::string verbose_info{}; |
| | | #define TAG |
| | | #define PRNTVITAG(args) |
| | | |
| | | /* |
| | | #ifndef PRNTVITAG |
| | | static thread_local std::string verbose_info{}; |
| | | #define TAG do{ verbose_info.clear(); \ |
| | | verbose_info=string("function [")+__FUNCTION__+string("]"); \ |
| | | }while(0) |
| | | #define PRNTVITAG(msg) do{ \ |
| | | verbose_info+=string("-> (") + msg + string(")"); \ |
| | | }while(0) |
| | | // #define TAG |
| | | // #define PRNTVITAG(args) |
| | | #endif |
| | | */ |
| | | |
| | | ///////////////////////////////////////////////// |
| | | enum{ |
| | |
| | | URLHeartBeat, |
| | | }; |
| | | |
| | | static char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //进程注册 |
| | | static char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //注销 |
| | | static char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //注册主题 |
| | | static char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //查询指定的主题 |
| | | static char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //查询所有注册的进程 |
| | | static char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //订阅本地主题 |
| | | static char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //订阅网络主题 |
| | | static char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc"; |
| | | static char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc"; //这个是代理中心,用于接收待发布的消息 |
| | | static char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc"; //这个是客户端从center订阅的通道 |
| | | static const char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //进程注册 |
| | | static const char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //注销 |
| | | static const char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //注册主题 |
| | | static const char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //查询指定的主题 |
| | | static const char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //查询所有注册的进程 |
| | | static const char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //订阅本地主题 |
| | | static const char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //订阅网络主题 |
| | | static const char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc"; |
| | | static const char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc"; //这个是代理中心,用于接收待发布的消息 |
| | | static const char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc"; //这个是客户端从center订阅的通道 |
| | | |
| | | static const std::unordered_map<int, std::string> map_url{ |
| | | static const std::unordered_map<int, const char*> map_url{ |
| | | {URLReg, IPC_REGISTER}, |
| | | {URLDeReg, IPC_UNREGISTER}, |
| | | {URLRegTopic, IPC_REGTOPIC}, |
| | |
| | | {URLSubQueue, IPC_SUB_QUEUE}, |
| | | {URLHeartBeat, IPC_HEARTBEAT}, |
| | | }; |
| | | inline std::string get_url(const int type){ |
| | | inline const char* get_url(const int type){ |
| | | auto iter = map_url.find(type); |
| | | if (iter != map_url.end()){ |
| | | return iter->second; |
| | | } |
| | | return {}; |
| | | return NULL; |
| | | } |
| | | |
| | | static constexpr int timeout_req_rep = 5162; |
| | | template <class... T> struct make_void{typedef void type;}; |
| | | template <class... T> using void_t = typename make_void<T...>::type; |
| | | template <class T, typename = void> struct is_default_c : std::false_type{}; |
| | | template <class T> struct is_default_c<T, void_t<decltype(T()),decltype(std::declval<T>().operator()())>> : std::true_type{}; |
| | | template<class T> using is_function_t = typename std::is_function<typename std::remove_pointer<typename std::remove_reference<T>::type>::type>::type; |
| | | template <bool, class T> struct is_callable_h : is_function_t<T>{}; |
| | | template <class T> struct is_callable_h<true, T>{ |
| | | private: |
| | | struct FB{void operator()();}; |
| | | struct D : T, FB{}; |
| | | template<typename U, U> struct c; |
| | | template<class> static std::true_type t(...); |
| | | template<class C> static std::false_type t(c<void(FB::*)(), &C::operator()>*); |
| | | public: |
| | | using type = decltype(t<D>(nullptr)); |
| | | }; |
| | | template <class T> using is_callable = typename is_callable_h<std::is_class<typename std::remove_reference<T>::type>::value, typename std::remove_reference<T>::type>::type; |
| | | |
| | | static constexpr int timeout_req_rep = 6251; |
| | | |
| | | inline void remove_exist(const std::string& url){ |
| | | if (url.find("ipc://") == 0){ |
| | |
| | | struct psmsg{ |
| | | DISABLE_COPY_AND_ASSIGN(psmsg); |
| | | psmsg(const std::string& t, std::string&& m) |
| | | :topic_(t),data_(std::move(m)){} |
| | | :topic_(t),data_(std::move(m)){} |
| | | psmsg(std::string&& t, std::string&& m) |
| | | :topic_(std::move(t)),data_(std::move(m)){} |
| | | std::string topic_{}; |
| | | std::string data_{}; |
| | | }; |
| | | public: |
| | | DISABLE_COPY_AND_ASSIGN(_ps); |
| | | _ps()=default; |
| | | int operator()(){return msg_.size();} |
| | | virtual ~_ps(){ |
| | | t_quit_.store(true, std::memory_order_relaxed); |
| | | cv_msg_.notify_all(); |
| | | if (t_.joinable()) t_.join(); |
| | | } |
| | | |
| | |
| | | DISABLE_COPY_AND_ASSIGN(_ps_sub); |
| | | _ps_sub()=default; |
| | | ~_ps_sub()=default; |
| | | |
| | | std::mutex& operator()(){return mtx_topics_;} |
| | | std::unordered_set<std::string> topics_{}; |
| | | std::mutex mtx_topics_{}; |
| | | std::unordered_set<std::string> failed_topics_{}; |
| | |
| | | public: |
| | | DISABLE_COPY_AND_ASSIGN(_sv); |
| | | _sv()=default; |
| | | std::deque<std::string> operator()(){return {fixed_msg_};} |
| | | ~_sv(){ |
| | | t_quit_.store(true, std::memory_order_relaxed); |
| | | if (t_.joinable()) t_.join(); |
| | |
| | | }; |
| | | |
| | | enum { INIT, RECV, WAIT, SEND }; |
| | | enum { REPLY_IPC, REPLY_TCP }; |
| | | struct work { |
| | | int state{-1}; |
| | | nng_aio *aio{}; |
| | |
| | | nng_ctx ctx{}; |
| | | void(*cb_recv)(work*){}; |
| | | void* user_data{}; |
| | | int mode{-1}; |
| | | }; |
| | | |
| | | static const std::string rr_unblocking_msg_{"~!@#$%^&*()-=<<UNBLOCKING>>=-()*&^%$#@!~"}; |
| | |
| | | public: |
| | | DISABLE_COPY_AND_ASSIGN(_rr); |
| | | _rr()=default; |
| | | std::tuple<uint64_t&,std::unordered_map<uint64_t, std::string>&> operator()(){return std::tie(work_index_, msg_);} |
| | | ~_rr(){ |
| | | if(std::get<0>(socks_).id > 0) nng_close(std::get<0>(socks_)); |
| | | if(std::get<0>(std::get<1>(socks_)).id > 0) nng_close(std::get<0>(std::get<1>(socks_))); |
| | |
| | | std::mutex mtx_msg_{}; |
| | | std::condition_variable cv_msg_{}; |
| | | |
| | | // std::deque<void*> q_src_{}; |
| | | }; |
| | | |
| | | template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0> inline T* singleton(){ static auto t = std::make_unique<T>(); return t.get(); } |
| | | |
| | | template <class T, class... Args, typename std::enable_if<is_callable<T>::value, int>::type=0> |
| | | inline std::thread get_thread(T&& t, Args&&... args){ |
| | | return std::thread(std::forward<T>(t), std::forward<Args>(args)...); |
| | | } |
| | | template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0> |
| | | inline T* singleton(){ static auto t = std::make_unique<T>(); return t.get(); } |
| | | template <class T, class... Args, typename std::enable_if<is_callable<T>::value&&sizeof...(Args)==1, int>::type=0> |
| | | inline std::thread get_thread(T&& t, Args&&... args){return std::thread(std::forward<T>(t), std::forward<Args>(args)...);} |
| | | |
| | | } |
| | | #endif |