#ifndef _bus_nng_class_h_
|
#define _bus_nng_class_h_
|
|
#include <string>
|
#include <thread>
|
#include <atomic>
|
#include <deque>
|
#include <unordered_set>
|
#include <unordered_map>
|
#include <mutex>
|
#include <condition_variable>
|
#include <tuple>
|
#include <unistd.h>
|
|
#include "nng/compat/nanomsg/nn.h"
|
#include <nng/nng.h>
|
|
namespace nng_wrap {
|
|
#define TAG
|
#define PRNTVITAG(args)
|
|
/*
|
#ifndef PRNTVITAG
|
static thread_local std::string verbose_info{};
|
#define TAG do{ verbose_info.clear(); \
|
verbose_info=string("function [")+__FUNCTION__+string("]"); \
|
}while(0)
|
#define PRNTVITAG(msg) do{ \
|
verbose_info+=string("-> (") + msg + string(")"); \
|
}while(0)
|
#endif
|
*/
|
|
/////////////////////////////////////////////////
|
enum{
|
URLReg,
|
URLDeReg,
|
URLRegTopic,
|
URLQueryTopic,
|
URLQueryProcs,
|
URLSubLocal,
|
URLSubNet,
|
URLPubProxy,
|
URLSubQueue,
|
URLHeartBeat,
|
};
|
|
static char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //进程注册
|
static char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //注销
|
static char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //注册主题
|
static char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //查询指定的主题
|
static char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //查询所有注册的进程
|
static char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //订阅本地主题
|
static char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //订阅网络主题
|
static char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc";
|
static char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc"; //这个是代理中心,用于接收待发布的消息
|
static char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc"; //这个是客户端从center订阅的通道
|
|
static const std::unordered_map<int, std::string> map_url{
|
{URLReg, IPC_REGISTER},
|
{URLDeReg, IPC_UNREGISTER},
|
{URLRegTopic, IPC_REGTOPIC},
|
{URLQueryTopic, IPC_QUERYTOPIC},
|
{URLQueryProcs, IPC_QUERYPROC},
|
{URLSubLocal, IPC_SUBLOCALTOPIC},
|
{URLSubNet, IPC_SUBNETTOPIC},
|
{URLPubProxy, IPC_PUB_PROXY},
|
{URLSubQueue, IPC_SUB_QUEUE},
|
{URLHeartBeat, IPC_HEARTBEAT},
|
};
|
inline std::string get_url(const int type){
|
auto iter = map_url.find(type);
|
if (iter != map_url.end()){
|
return iter->second;
|
}
|
return {};
|
}
|
|
template <class... T> struct make_void{typedef void type;};
|
template <class... T> using void_t = typename make_void<T...>::type;
|
template <class T, typename = void> struct is_default_c : std::false_type{};
|
template <class T> struct is_default_c<T, void_t<decltype(T()),decltype(std::declval<T>().operator()())>> : std::true_type{};
|
template<class T> using is_function_t = typename std::is_function<typename std::remove_pointer<typename std::remove_reference<T>::type>::type>::type;
|
template <bool, class T> struct is_callable_h : is_function_t<T>{};
|
template <class T> struct is_callable_h<true, T>{
|
private:
|
struct FB{void operator()();};
|
struct D : T, FB{};
|
template<typename U, U> struct c;
|
template<class> static std::true_type t(...);
|
template<class C> static std::false_type t(c<void(FB::*)(), &C::operator()>*);
|
public:
|
using type = decltype(t<D>(nullptr));
|
};
|
template <class T> using is_callable = typename is_callable_h<std::is_class<typename std::remove_reference<T>::type>::value, typename std::remove_reference<T>::type>::type;
|
|
static constexpr int timeout_req_rep = 6251;
|
|
inline void remove_exist(const std::string& url){
|
if (url.find("ipc://") == 0){
|
std::string address(url);
|
address = address.substr(6);
|
if (access(address.c_str(), F_OK) == 0){
|
remove(address.c_str());
|
}
|
}
|
}
|
|
/////////////////////////////////////////////////
|
|
// base class
|
#define DISABLE_COPY_AND_ASSIGN(className) \
|
className(const className&)=delete; \
|
className(className&&)=delete; \
|
className& operator=(const className&)=delete; \
|
className& operator=(className&&)=delete
|
|
class _nn{
|
public:
|
DISABLE_COPY_AND_ASSIGN(_nn);
|
_nn()=default;
|
virtual ~_nn(){ if (socket_ > 0) nn_close(socket_); }
|
int socket_{-1};
|
std::string url_{};
|
};
|
|
///////////////////////////////////////////////
|
// publish
|
|
class _ps : public _nn{
|
public:
|
struct psmsg{
|
DISABLE_COPY_AND_ASSIGN(psmsg);
|
psmsg(const std::string& t, std::string&& m)
|
:topic_(t),data_(std::move(m)){}
|
std::string topic_{};
|
std::string data_{};
|
};
|
public:
|
DISABLE_COPY_AND_ASSIGN(_ps);
|
_ps()=default;
|
int operator()(){return msg_.size();}
|
virtual ~_ps(){
|
t_quit_.store(true, std::memory_order_relaxed);
|
cv_msg_.notify_all();
|
if (t_.joinable()) t_.join();
|
}
|
|
std::thread t_;
|
std::atomic_bool t_quit_{false};
|
std::deque<psmsg> msg_{};
|
std::mutex mtx_msg_{};
|
std::condition_variable cv_msg_{};
|
};
|
|
class _ps_sub : public _ps{
|
public:
|
DISABLE_COPY_AND_ASSIGN(_ps_sub);
|
_ps_sub()=default;
|
~_ps_sub()=default;
|
std::mutex& operator()(){return mtx_topics_;}
|
std::unordered_set<std::string> topics_{};
|
std::mutex mtx_topics_{};
|
std::unordered_set<std::string> failed_topics_{};
|
std::mutex mtx_failed_topics_{};
|
};
|
|
class _sv : public _nn{
|
public:
|
DISABLE_COPY_AND_ASSIGN(_sv);
|
_sv()=default;
|
std::deque<std::string> operator()(){return {fixed_msg_};}
|
~_sv(){
|
t_quit_.store(true, std::memory_order_relaxed);
|
if (t_.joinable()) t_.join();
|
}
|
|
std::thread t_;
|
std::atomic_bool t_quit_{false};
|
std::string fixed_msg_{};
|
};
|
|
enum { INIT, RECV, WAIT, SEND };
|
enum { REPLY_IPC, REPLY_TCP };
|
struct work {
|
int state{-1};
|
nng_aio *aio{};
|
nng_msg *msg{};
|
nng_ctx ctx{};
|
void(*cb_recv)(work*){};
|
void* user_data{};
|
int mode{-1};
|
};
|
|
static const std::string rr_unblocking_msg_{"~!@#$%^&*()-=<<UNBLOCKING>>=-()*&^%$#@!~"};
|
class _rr : public _nn{
|
public:
|
DISABLE_COPY_AND_ASSIGN(_rr);
|
_rr()=default;
|
std::tuple<uint64_t&,std::unordered_map<uint64_t, std::string>&> operator()(){return std::tie(work_index_, msg_);}
|
~_rr(){
|
if(std::get<0>(socks_).id > 0) nng_close(std::get<0>(socks_));
|
if(std::get<0>(std::get<1>(socks_)).id > 0) nng_close(std::get<0>(std::get<1>(socks_)));
|
t_quit_.store(true, std::memory_order_relaxed);
|
if (t_unblock_&&t_unblock_->joinable()) t_unblock_->join();
|
}
|
|
std::unique_ptr<std::thread> t_unblock_{nullptr};
|
std::atomic_bool t_quit_{false};
|
|
std::tuple<nng_socket, std::tuple<nng_socket, int>> socks_;
|
|
std::unordered_map<uint64_t, std::string> msg_{};
|
class worker{
|
worker& in_op(const worker& w){if(&w!=this){w_=w.w_;life_=w.life_;}return *this;};
|
public:
|
worker(struct work* w):w_(w),life_(0){}
|
worker(worker&& w):w_(w.w_),life_(w.life_){}
|
operator struct work*() const{return w_;}
|
operator int&() {return life_;}
|
struct work* w_{};
|
int life_{};
|
};
|
std::unordered_map<uint64_t, worker> works_{};
|
uint64_t work_index_{0};
|
std::mutex mtx_msg_{};
|
std::condition_variable cv_msg_{};
|
|
};
|
|
template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0>
|
inline T* singleton(){ static auto t = std::make_unique<T>(); return t.get(); }
|
template <class T, class... Args, typename std::enable_if<is_callable<T>::value&&sizeof...(Args)==1, int>::type=0>
|
inline std::thread get_thread(T&& t, Args&&... args){return std::thread(std::forward<T>(t), std::forward<Args>(args)...);}
|
|
}
|
#endif
|