zhangmeng
2022-12-14 a49287079cb5a97ef65818b70529c9d3bbdd99fa
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#ifndef _bus_nng_class_h_
#define _bus_nng_class_h_
 
#include <string>
#include <thread>
#include <atomic>
#include <deque>
#include <unordered_set>
#include <unordered_map>
#include <mutex>
#include <condition_variable>
#include <tuple>
#include <unistd.h>
 
#include "nng/compat/nanomsg/nn.h"
#include <nng/nng.h>
 
namespace nng_wrap {
 
#define TAG
#define PRNTVITAG(args)
 
/*
#ifndef PRNTVITAG
static thread_local std::string verbose_info{};
#define TAG do{ verbose_info.clear(); \
                verbose_info=string("function [")+__FUNCTION__+string("]"); \
            }while(0)
#define PRNTVITAG(msg) do{ \
            verbose_info+=string("-> (") + msg + string(")"); \
        }while(0)
#endif
*/
 
/////////////////////////////////////////////////
enum{
    URLReg,
    URLDeReg,
    URLRegTopic,
    URLQueryTopic,
    URLQueryProcs,
    URLSubLocal,
    URLSubNet,
    URLPubProxy,
    URLSubQueue,
    URLHeartBeat,
};
 
static char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //进程注册
static char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //注销
static char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //注册主题
static char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //查询指定的主题
static char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //查询所有注册的进程
static char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //订阅本地主题
static char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //订阅网络主题
static char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc";
static char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc";   //这个是代理中心,用于接收待发布的消息
static char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc";  //这个是客户端从center订阅的通道
 
static const std::unordered_map<int, std::string> map_url{
    {URLReg,                IPC_REGISTER},
    {URLDeReg,              IPC_UNREGISTER},
    {URLRegTopic,           IPC_REGTOPIC},
    {URLQueryTopic,         IPC_QUERYTOPIC},
    {URLQueryProcs,         IPC_QUERYPROC},
    {URLSubLocal,           IPC_SUBLOCALTOPIC},
    {URLSubNet,             IPC_SUBNETTOPIC},
    {URLPubProxy,           IPC_PUB_PROXY},
    {URLSubQueue,           IPC_SUB_QUEUE},
    {URLHeartBeat,          IPC_HEARTBEAT},
};
inline std::string get_url(const int type){
    auto iter = map_url.find(type);
    if (iter != map_url.end()){
        return iter->second;
    }
    return {};
}
 
template <class... T> struct make_void{typedef void type;};
template <class... T> using void_t = typename make_void<T...>::type;
template <class T, typename = void> struct is_default_c : std::false_type{};
template <class T> struct is_default_c<T, void_t<decltype(T()),decltype(std::declval<T>().operator()())>> : std::true_type{};
template<class T> using is_function_t = typename std::is_function<typename std::remove_pointer<typename std::remove_reference<T>::type>::type>::type;
template <bool, class T> struct is_callable_h : is_function_t<T>{};
template <class T> struct is_callable_h<true, T>{
private:
    struct FB{void operator()();};
    struct D : T, FB{};
    template<typename U, U> struct c;
    template<class> static std::true_type t(...);
    template<class C> static std::false_type t(c<void(FB::*)(), &C::operator()>*);
public:
    using type = decltype(t<D>(nullptr));
};
template <class T> using is_callable = typename is_callable_h<std::is_class<typename std::remove_reference<T>::type>::value, typename std::remove_reference<T>::type>::type;
 
static constexpr int timeout_req_rep = 6251;
 
inline void remove_exist(const std::string& url){
    if (url.find("ipc://") == 0){
        std::string address(url);
        address = address.substr(6);
        if (access(address.c_str(), F_OK) == 0){
            remove(address.c_str());
        }
    }
}
 
/////////////////////////////////////////////////
 
// base class
#define DISABLE_COPY_AND_ASSIGN(className) \
        className(const className&)=delete; \
        className(className&&)=delete; \
        className& operator=(const className&)=delete; \
        className& operator=(className&&)=delete
 
class _nn{
public:
    DISABLE_COPY_AND_ASSIGN(_nn);
    _nn()=default;
    virtual ~_nn(){ if (socket_ > 0) nn_close(socket_); }
    int                         socket_{-1};
    std::string                 url_{};
};
 
///////////////////////////////////////////////
// publish
 
class _ps : public _nn{
public:
    struct psmsg{
        DISABLE_COPY_AND_ASSIGN(psmsg);
        psmsg(const std::string& t, std::string&& m)
        :topic_(t),data_(std::move(m)){}
        std::string topic_{};
        std::string data_{};
    };
public:
    DISABLE_COPY_AND_ASSIGN(_ps);
    _ps()=default;
    int operator()(){return msg_.size();}
    virtual ~_ps(){
        t_quit_.store(true, std::memory_order_relaxed);
        cv_msg_.notify_all();
        if (t_.joinable()) t_.join();
    }
 
    std::thread              t_;
    std::atomic_bool         t_quit_{false};
    std::deque<psmsg>        msg_{};
    std::mutex               mtx_msg_{};
    std::condition_variable  cv_msg_{};
};
 
class _ps_sub : public _ps{
public:
    DISABLE_COPY_AND_ASSIGN(_ps_sub);
    _ps_sub()=default;
    ~_ps_sub()=default;
    std::mutex& operator()(){return mtx_topics_;}
    std::unordered_set<std::string>     topics_{};
    std::mutex                          mtx_topics_{};
    std::unordered_set<std::string>     failed_topics_{};
    std::mutex                          mtx_failed_topics_{};
};
 
class _sv : public _nn{
public:
    DISABLE_COPY_AND_ASSIGN(_sv);
    _sv()=default;
    std::deque<std::string> operator()(){return {fixed_msg_};}
    ~_sv(){
        t_quit_.store(true, std::memory_order_relaxed);
        if (t_.joinable()) t_.join();
    }
 
    std::thread      t_;
    std::atomic_bool t_quit_{false};
    std::string      fixed_msg_{};
};
 
enum { INIT, RECV, WAIT, SEND };
enum { REPLY_IPC, REPLY_TCP };
struct work {
    int state{-1};
    nng_aio *aio{};
    nng_msg *msg{};
    nng_ctx  ctx{};
    void(*cb_recv)(work*){};
    void* user_data{};
    int mode{-1};
};
 
static const std::string rr_unblocking_msg_{"~!@#$%^&*()-=<<UNBLOCKING>>=-()*&^%$#@!~"};
class _rr : public _nn{
public:
    DISABLE_COPY_AND_ASSIGN(_rr);
    _rr()=default;
    std::tuple<uint64_t&,std::unordered_map<uint64_t, std::string>&> operator()(){return std::tie(work_index_, msg_);}
    ~_rr(){
        if(std::get<0>(socks_).id > 0) nng_close(std::get<0>(socks_));
        if(std::get<0>(std::get<1>(socks_)).id > 0) nng_close(std::get<0>(std::get<1>(socks_)));
        t_quit_.store(true, std::memory_order_relaxed);
        if (t_unblock_&&t_unblock_->joinable()) t_unblock_->join();
    }
 
    std::unique_ptr<std::thread>                    t_unblock_{nullptr};
    std::atomic_bool                                t_quit_{false};
 
    std::tuple<nng_socket, std::tuple<nng_socket, int>> socks_;
 
    std::unordered_map<uint64_t, std::string>       msg_{};
    class worker{
        worker& in_op(const worker& w){if(&w!=this){w_=w.w_;life_=w.life_;}return *this;};
    public:
        worker(struct work* w):w_(w),life_(0){}
        worker(worker&& w):w_(w.w_),life_(w.life_){}
        operator struct work*() const{return w_;}
        operator int&() {return life_;}
        struct work* w_{};
        int life_{};
    };
    std::unordered_map<uint64_t, worker>            works_{};
    uint64_t                                        work_index_{0};
    std::mutex                                      mtx_msg_{};
    std::condition_variable                         cv_msg_{};
 
};
 
template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0>
inline T* singleton(){ static auto t = std::make_unique<T>(); return t.get(); }
template <class T, class... Args, typename std::enable_if<is_callable<T>::value&&sizeof...(Args)==1, int>::type=0>
inline std::thread get_thread(T&& t, Args&&... args){return std::thread(std::forward<T>(t), std::forward<Args>(args)...);}
 
}
#endif