#ifndef _bus_nng_class_h_ #define _bus_nng_class_h_ #include #include #include #include #include #include #include #include #include #include #include "nng/compat/nanomsg/nn.h" #include namespace nng_wrap { #define TAG #define PRNTVITAG(args) /* #ifndef PRNTVITAG static thread_local std::string verbose_info{}; #define TAG do{ verbose_info.clear(); \ verbose_info=string("function [")+__FUNCTION__+string("]"); \ }while(0) #define PRNTVITAG(msg) do{ \ verbose_info+=string("-> (") + msg + string(")"); \ }while(0) #endif */ ///////////////////////////////////////////////// enum{ URLReg, URLDeReg, URLRegTopic, URLQueryTopic, URLQueryProcs, URLSubLocal, URLSubNet, URLPubProxy, URLSubQueue, URLHeartBeat, }; static const char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //进程注册 static const char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //注销 static const char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //注册主题 static const char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //查询指定的主题 static const char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //查询所有注册的进程 static const char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //订阅本地主题 static const char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //订阅网络主题 static const char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc"; static const char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc"; //这个是代理中心,用于接收待发布的消息 static const char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc"; //这个是客户端从center订阅的通道 static const std::unordered_map map_url{ {URLReg, IPC_REGISTER}, {URLDeReg, IPC_UNREGISTER}, {URLRegTopic, IPC_REGTOPIC}, {URLQueryTopic, IPC_QUERYTOPIC}, {URLQueryProcs, IPC_QUERYPROC}, {URLSubLocal, IPC_SUBLOCALTOPIC}, {URLSubNet, IPC_SUBNETTOPIC}, {URLPubProxy, IPC_PUB_PROXY}, {URLSubQueue, IPC_SUB_QUEUE}, {URLHeartBeat, IPC_HEARTBEAT}, }; inline const char* get_url(const int type){ auto iter = map_url.find(type); if (iter != map_url.end()){ return iter->second; } return NULL; } template struct make_void{typedef void type;}; template using void_t = typename make_void::type; template struct is_default_c : std::false_type{}; template struct is_default_c().operator()())>> : std::true_type{}; template using is_function_t = typename std::is_function::type>::type>::type; template struct is_callable_h : is_function_t{}; template struct is_callable_h{ private: struct FB{void operator()();}; struct D : T, FB{}; template struct c; template static std::true_type t(...); template static std::false_type t(c*); public: using type = decltype(t(nullptr)); }; template using is_callable = typename is_callable_h::type>::value, typename std::remove_reference::type>::type; static constexpr int timeout_req_rep = 6251; inline void remove_exist(const std::string& url){ if (url.find("ipc://") == 0){ std::string address(url); address = address.substr(6); if (access(address.c_str(), F_OK) == 0){ remove(address.c_str()); } } } ///////////////////////////////////////////////// // base class #define DISABLE_COPY_AND_ASSIGN(className) \ className(const className&)=delete; \ className(className&&)=delete; \ className& operator=(const className&)=delete; \ className& operator=(className&&)=delete class _nn{ public: DISABLE_COPY_AND_ASSIGN(_nn); _nn()=default; virtual ~_nn(){ if (socket_ > 0) nn_close(socket_); } int socket_{-1}; std::string url_{}; }; /////////////////////////////////////////////// // publish class _ps : public _nn{ public: struct psmsg{ DISABLE_COPY_AND_ASSIGN(psmsg); psmsg(const std::string& t, std::string&& m) :topic_(t),data_(std::move(m)){} psmsg(std::string&& t, std::string&& m) :topic_(std::move(t)),data_(std::move(m)){} std::string topic_{}; std::string data_{}; }; public: DISABLE_COPY_AND_ASSIGN(_ps); _ps()=default; int operator()(){return msg_.size();} virtual ~_ps(){ t_quit_.store(true, std::memory_order_relaxed); cv_msg_.notify_all(); if (t_.joinable()) t_.join(); } std::thread t_; std::atomic_bool t_quit_{false}; std::deque msg_{}; std::mutex mtx_msg_{}; std::condition_variable cv_msg_{}; }; class _ps_sub : public _ps{ public: DISABLE_COPY_AND_ASSIGN(_ps_sub); _ps_sub()=default; ~_ps_sub()=default; std::mutex& operator()(){return mtx_topics_;} std::unordered_set topics_{}; std::mutex mtx_topics_{}; std::unordered_set failed_topics_{}; std::mutex mtx_failed_topics_{}; }; class _sv : public _nn{ public: DISABLE_COPY_AND_ASSIGN(_sv); _sv()=default; std::deque operator()(){return {fixed_msg_};} ~_sv(){ t_quit_.store(true, std::memory_order_relaxed); if (t_.joinable()) t_.join(); } std::thread t_; std::atomic_bool t_quit_{false}; std::string fixed_msg_{}; }; enum { INIT, RECV, WAIT, SEND }; enum { REPLY_IPC, REPLY_TCP }; struct work { int state{-1}; nng_aio *aio{}; nng_msg *msg{}; nng_ctx ctx{}; void(*cb_recv)(work*){}; void* user_data{}; int mode{-1}; }; static const std::string rr_unblocking_msg_{"~!@#$%^&*()-=<>=-()*&^%$#@!~"}; class _rr : public _nn{ public: DISABLE_COPY_AND_ASSIGN(_rr); _rr()=default; std::tuple&> operator()(){return std::tie(work_index_, msg_);} ~_rr(){ if(std::get<0>(socks_).id > 0) nng_close(std::get<0>(socks_)); if(std::get<0>(std::get<1>(socks_)).id > 0) nng_close(std::get<0>(std::get<1>(socks_))); t_quit_.store(true, std::memory_order_relaxed); if (t_unblock_&&t_unblock_->joinable()) t_unblock_->join(); } std::unique_ptr t_unblock_{nullptr}; std::atomic_bool t_quit_{false}; std::tuple> socks_; std::unordered_map msg_{}; class worker{ worker& in_op(const worker& w){if(&w!=this){w_=w.w_;life_=w.life_;}return *this;}; public: worker(struct work* w):w_(w),life_(0){} worker(worker&& w):w_(w.w_),life_(w.life_){} operator struct work*() const{return w_;} operator int&() {return life_;} struct work* w_{}; int life_{}; }; std::unordered_map works_{}; uint64_t work_index_{0}; std::mutex mtx_msg_{}; std::condition_variable cv_msg_{}; // std::deque q_src_{}; }; template::value, int>::type=0> inline T* singleton(){ static auto t = std::make_unique(); return t.get(); } template ::value&&sizeof...(Args)==1, int>::type=0> inline std::thread get_thread(T&& t, Args&&... args){return std::thread(std::forward(t), std::forward(args)...);} } #endif