From 16935f4aebffdd1b6580b844391a0aa0f4f3012b Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 22 四月 2024 10:29:12 +0800
Subject: [PATCH] bug fixed

---
 src/common.h |   89 ++++++++++++++++++++++++++++----------------
 1 files changed, 57 insertions(+), 32 deletions(-)

diff --git a/src/common.h b/src/common.h
index 1a9a826..78f757d 100644
--- a/src/common.h
+++ b/src/common.h
@@ -9,6 +9,7 @@
 #include <unordered_map>
 #include <mutex>
 #include <condition_variable>
+#include <tuple>
 #include <unistd.h>
 
 #include "nng/compat/nanomsg/nn.h"
@@ -16,17 +17,20 @@
 
 namespace nng_wrap {
 
-static thread_local std::string verbose_info{};
+#define TAG
+#define PRNTVITAG(args)
+
+/*
 #ifndef PRNTVITAG
+static thread_local std::string verbose_info{};
 #define TAG do{ verbose_info.clear(); \
                 verbose_info=string("function [")+__FUNCTION__+string("]"); \
             }while(0)
 #define PRNTVITAG(msg) do{ \
             verbose_info+=string("-> (") + msg + string(")"); \
         }while(0)
-// #define TAG
-// #define PRNTVITAG(args)
 #endif
+*/
 
 /////////////////////////////////////////////////
 enum{
@@ -42,18 +46,18 @@
     URLHeartBeat,
 };
 
-static char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //杩涚▼娉ㄥ唽
-static char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //娉ㄩ攢
-static char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //娉ㄥ唽涓婚
-static char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //鏌ヨ鎸囧畾鐨勪富棰�
-static char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //鏌ヨ鎵�鏈夋敞鍐岀殑杩涚▼
-static char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //璁㈤槄鏈湴涓婚
-static char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //璁㈤槄缃戠粶涓婚
-static char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc";
-static char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc";   //杩欎釜鏄唬鐞嗕腑蹇冿紝鐢ㄤ簬鎺ユ敹寰呭彂甯冪殑娑堟伅
-static char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc";  //杩欎釜鏄鎴风浠巆enter璁㈤槄鐨勯�氶亾
+static const char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //杩涚▼娉ㄥ唽
+static const char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //娉ㄩ攢
+static const char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //娉ㄥ唽涓婚
+static const char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //鏌ヨ鎸囧畾鐨勪富棰�
+static const char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //鏌ヨ鎵�鏈夋敞鍐岀殑杩涚▼
+static const char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //璁㈤槄鏈湴涓婚
+static const char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //璁㈤槄缃戠粶涓婚
+static const char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc";
+static const char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc";   //杩欎釜鏄唬鐞嗕腑蹇冿紝鐢ㄤ簬鎺ユ敹寰呭彂甯冪殑娑堟伅
+static const char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc";  //杩欎釜鏄鎴风浠巆enter璁㈤槄鐨勯�氶亾
 
-static const std::unordered_map<int, std::string> map_url{
+static const std::unordered_map<int, const char*> map_url{
     {URLReg,                IPC_REGISTER},
     {URLDeReg,              IPC_UNREGISTER},
     {URLRegTopic,           IPC_REGTOPIC},
@@ -65,15 +69,33 @@
     {URLSubQueue,           IPC_SUB_QUEUE},
     {URLHeartBeat,          IPC_HEARTBEAT},
 };
-inline std::string get_url(const int type){
+inline const char* get_url(const int type){
     auto iter = map_url.find(type);
     if (iter != map_url.end()){
         return iter->second;
     }
-    return {};
+    return NULL;
 }
 
-static constexpr int timeout_req_rep = 5162;
+template <class... T> struct make_void{typedef void type;};
+template <class... T> using void_t = typename make_void<T...>::type;
+template <class T, typename = void> struct is_default_c : std::false_type{};
+template <class T> struct is_default_c<T, void_t<decltype(T()),decltype(std::declval<T>().operator()())>> : std::true_type{};
+template<class T> using is_function_t = typename std::is_function<typename std::remove_pointer<typename std::remove_reference<T>::type>::type>::type;
+template <bool, class T> struct is_callable_h : is_function_t<T>{};
+template <class T> struct is_callable_h<true, T>{
+private:
+    struct FB{void operator()();};
+    struct D : T, FB{};
+    template<typename U, U> struct c;
+    template<class> static std::true_type t(...);
+    template<class C> static std::false_type t(c<void(FB::*)(), &C::operator()>*);
+public:
+    using type = decltype(t<D>(nullptr));
+};
+template <class T> using is_callable = typename is_callable_h<std::is_class<typename std::remove_reference<T>::type>::value, typename std::remove_reference<T>::type>::type;
+
+static constexpr int timeout_req_rep = 6251;
 
 inline void remove_exist(const std::string& url){
     if (url.find("ipc://") == 0){
@@ -110,17 +132,20 @@
 public:
     struct psmsg{
         DISABLE_COPY_AND_ASSIGN(psmsg);
-        psmsg()=delete;
         psmsg(const std::string& t, std::string&& m)
-        :topic_(t),data_(std::move(m)){}
+            :topic_(t),data_(std::move(m)){}
+        psmsg(std::string&& t, std::string&& m)
+            :topic_(std::move(t)),data_(std::move(m)){}
         std::string topic_{};
         std::string data_{};
     };
 public:
     DISABLE_COPY_AND_ASSIGN(_ps);
     _ps()=default;
+    int operator()(){return msg_.size();}
     virtual ~_ps(){
         t_quit_.store(true, std::memory_order_relaxed);
+        cv_msg_.notify_all();
         if (t_.joinable()) t_.join();
     }
 
@@ -136,7 +161,7 @@
     DISABLE_COPY_AND_ASSIGN(_ps_sub);
     _ps_sub()=default;
     ~_ps_sub()=default;
-
+    std::mutex& operator()(){return mtx_topics_;}
     std::unordered_set<std::string>     topics_{};
     std::mutex                          mtx_topics_{};
     std::unordered_set<std::string>     failed_topics_{};
@@ -147,6 +172,7 @@
 public:
     DISABLE_COPY_AND_ASSIGN(_sv);
     _sv()=default;
+    std::deque<std::string> operator()(){return {fixed_msg_};}
     ~_sv(){
         t_quit_.store(true, std::memory_order_relaxed);
         if (t_.joinable()) t_.join();
@@ -158,6 +184,7 @@
 };
 
 enum { INIT, RECV, WAIT, SEND };
+enum { REPLY_IPC, REPLY_TCP };
 struct work {
     int state{-1};
     nng_aio *aio{};
@@ -165,6 +192,7 @@
     nng_ctx  ctx{};
     void(*cb_recv)(work*){};
     void* user_data{};
+    int mode{-1};
 };
 
 static const std::string rr_unblocking_msg_{"~!@#$%^&*()-=<<UNBLOCKING>>=-()*&^%$#@!~"};
@@ -172,32 +200,25 @@
 public:
     DISABLE_COPY_AND_ASSIGN(_rr);
     _rr()=default;
+    std::tuple<uint64_t&,std::unordered_map<uint64_t, std::string>&> operator()(){return std::tie(work_index_, msg_);}
     ~_rr(){
-        if(sock_local_.id > 0) nng_close(sock_local_);
-        if(sock_remote_.id > 0) nng_close(sock_remote_);
+        if(std::get<0>(socks_).id > 0) nng_close(std::get<0>(socks_));
+        if(std::get<0>(std::get<1>(socks_)).id > 0) nng_close(std::get<0>(std::get<1>(socks_)));
         t_quit_.store(true, std::memory_order_relaxed);
         if (t_unblock_&&t_unblock_->joinable()) t_unblock_->join();
     }
 
-
     std::unique_ptr<std::thread>                    t_unblock_{nullptr};
     std::atomic_bool                                t_quit_{false};
 
-    nng_socket                                      sock_local_{0};
-    nng_socket                                      sock_remote_{0};
-    int                                             port_{-1};
+    std::tuple<nng_socket, std::tuple<nng_socket, int>> socks_;
 
     std::unordered_map<uint64_t, std::string>       msg_{};
     class worker{
         worker& in_op(const worker& w){if(&w!=this){w_=w.w_;life_=w.life_;}return *this;};
     public:
-        worker()=default;
-        ~worker()=default;
         worker(struct work* w):w_(w),life_(0){}
-        worker(const worker& w):w_(w.w_),life_(w.life_){}
         worker(worker&& w):w_(w.w_),life_(w.life_){}
-        worker& operator=(const worker& w){return in_op(w);}
-        worker& operator=(worker&& w){return in_op(w);}
         operator struct work*() const{return w_;}
         operator int&() {return life_;}
         struct work* w_{};
@@ -208,9 +229,13 @@
     std::mutex                                      mtx_msg_{};
     std::condition_variable                         cv_msg_{};
 
+    // std::deque<void*>                               q_src_{};
 };
 
-template<class T> inline T* singleton(){ static T t; return &t; }
+template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0>
+inline T* singleton(){ static auto t = std::make_unique<T>(); return t.get(); }
+template <class T, class... Args, typename std::enable_if<is_callable<T>::value&&sizeof...(Args)==1, int>::type=0>
+inline std::thread get_thread(T&& t, Args&&... args){return std::thread(std::forward<T>(t), std::forward<Args>(args)...);}
 
 }
 #endif
\ No newline at end of file

--
Gitblit v1.8.0