From bef24e7b4001c7c7cd7a03a22f8eaf7c8af3c4b7 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期四, 20 一月 2022 14:08:35 +0800
Subject: [PATCH] performance

---
 src/nng_wrap.cpp |   29 ++++++++++++++++++-----------
 1 files changed, 18 insertions(+), 11 deletions(-)

diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp
index 3ae3f62..a3b0bd3 100644
--- a/src/nng_wrap.cpp
+++ b/src/nng_wrap.cpp
@@ -138,7 +138,7 @@
                 msg = &pub->msg_.front();
                 if (msg->topic_.empty()) {pub->msg_.pop_front(); continue;}
             }
-            string sndmsg(msg->topic_ + msg->data_);
+            string sndmsg = (string{msg->topic_}+='\0')+=msg->data_;
             int rc = nn_send(pub->socket_, sndmsg.data(), sndmsg.size(), 0);
             if (rc == (int)sndmsg.size()){
                 char* tmp{};
@@ -162,6 +162,7 @@
 }
 
 int publish(const std::string& topic, const void* data, const int data_len, void* arg/*=NULL*/){
+    // printf("======>> publish topic %s\n", topic.c_str());
     _ps* pub = (_ps*)arg;
     if (!pub) pub = singleton<_ps>();
 
@@ -209,19 +210,18 @@
             if (m_len > 0){
                 string tmp_msg{m, (size_t)m_len};
                 nn_freemsg(m);
-                string topic{}, msg{};
+                const auto topic{tmp_msg.c_str()};
+                string msg{};
                 {
-                    lock_guard<mutex> l{(*sub)()};
+                    lock_guard<mutex> l{sub->operator()()};
                     for(auto && i : sub->topics_){
-                        if (tmp_msg.size() < i.size()) continue;
-                        topic = move(tmp_msg.substr(0, i.size()));
-                        if (topic == i){
-                            msg = move(tmp_msg.substr(i.size()));
+                        if (!!!i.compare(topic)){
+                            msg = move(tmp_msg.substr(i.size()+1));
                             break;
                         }
                     }
                 }
-                printf("======>> subscribe recv topic %s msg length %lu\n", topic.c_str(), msg.length());
+                printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length());
                 if (!msg.empty()){
                     lock_guard<mutex> l(sub->mtx_msg_);
                     sub->msg_.emplace_back(topic, move(msg));
@@ -233,7 +233,7 @@
                     lock_guard<mutex> l{sub->mtx_failed_topics_};
                     if (!sub->failed_topics_.empty()){
                         for(auto iter = sub->failed_topics_.begin(); iter != sub->failed_topics_.end();){
-                            if (nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_UNSUBSCRIBE, iter->c_str(), iter->length()) >= 0){
+                            if (nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_SUBSCRIBE, iter->c_str(), iter->length()) >= 0){
                                 iter = sub->failed_topics_.erase(iter);
                             }else{
                                 iter++;
@@ -407,6 +407,7 @@
     case SEND:
         if ((rv = nng_aio_result(work->aio)) != 0) {
             nng_msg_free(work->msg);
+            work->msg = NULL;
         }
         work->state = RECV;
         nng_ctx_recv(work->ctx, work->aio);
@@ -445,9 +446,11 @@
     w->user_data = rep;
 
     if ((rv = nng_aio_alloc(&w->aio, server_cb, w)) != 0) {
+        nng_free(w, sizeof(*w));
         return NULL;
     }
     if ((rv = nng_ctx_open(&w->ctx, sock)) != 0) {
+        nng_free(w, sizeof(*w));
         return NULL;
     }
     w->state = INIT;
@@ -473,6 +476,7 @@
     remove_exist(url);
     rv = nng_listen(*sock, url.c_str(), NULL, 0);
     if (rv < 0){
+        for(int i = 0; i < count; i++) if(works[i]) nng_free(works[i], sizeof(work));
         free(works);
         PRNTVITAG("create_server nng_listen failed");
         PRNTVITAG(url);
@@ -509,6 +513,7 @@
         get<1>(get<1>(rep->socks_)) = port;
         ipc = "tcp://0.0.0.0:" + to_string(port);
         if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
+        printf("======>> create server for remote port %d\n", port);
     }else {
         get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
     }
@@ -518,11 +523,13 @@
             constexpr int idle = 10;
             const auto data = rr_unblocking_msg_.data();
             const auto data_size = rr_unblocking_msg_.size();
+            constexpr int life_span = timeout_req_rep*10;
+
             auto f = [rep]{
                 vector<struct work*> tmp{};
                 lock_guard<mutex> l{rep->mtx_msg_};
                 for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
-                    if ((iter->second+=idle) > timeout_req_rep){
+                    if ((iter->second+=idle) > life_span){
                         tmp.push_back(iter->second.w_);
                         iter = rep->works_.erase(iter);
                     }else {
@@ -561,7 +568,7 @@
             return !rep->msg_.empty();
         });
         if (!status){
-            PRNTVITAG("subscribe_read timeout");
+            PRNTVITAG("read_request timeout");
             return -1;
         }
         auto iter = rep->msg_.begin();

--
Gitblit v1.8.0