From f93a93f959c77aaeaf9154a7d8950691b91e1009 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 20 三月 2023 13:31:33 +0800 Subject: [PATCH] bug fixed --- src/nng_wrap.cpp | 40 +++++++++++++++++++++++++--------------- 1 files changed, 25 insertions(+), 15 deletions(-) diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp index 53a192a..0a836fc 100644 --- a/src/nng_wrap.cpp +++ b/src/nng_wrap.cpp @@ -131,6 +131,8 @@ set_socket_timeout(sock, timeout_req_rep); pub->socket_ = sock; pub->t_ = get_thread([](const auto pub){ + string sndmsg{}; + sndmsg.reserve(1024); while (!pub->t_quit_.load()) { _ps::psmsg *msg{NULL}; { @@ -142,9 +144,14 @@ msg = &pub->msg_.front(); if (msg->topic_.empty()) {pub->msg_.pop_front(); continue;} } - string sndmsg = (string{msg->topic_}+='\0')+=msg->data_; + sndmsg += msg->topic_; + sndmsg += '\0'; + sndmsg += msg->data_; + int sndmsgsize = (int)sndmsg.size(); + // string sndmsg = (string{msg->topic_}+='\0')+=msg->data_; int rc = nn_send(pub->socket_, sndmsg.data(), sndmsg.size(), 0); - if (rc == (int)sndmsg.size()){ + sndmsg.clear(); + if (rc == sndmsgsize){ char* tmp{}; rc = nn_recv(pub->socket_, &tmp, NN_MSG, 0); if (rc > 0){ @@ -213,26 +220,30 @@ // int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT); int m_len = nn_recv(sub->socket_, &m, NN_MSG, 0); if (m_len > 0){ - string tmp_msg{m, (size_t)m_len}; - nn_freemsg(m); - string topic{tmp_msg.c_str()}; - string msg{}; + char* topic = m; + char* msg = m + strlen(topic) + 1; + size_t msgl = m_len - strlen(topic) - 1; + // string tmp_msg{m, (size_t)m_len}; + // nn_freemsg(m); + // string topic{tmp_msg.c_str()}; + // string msg{}; + bool found_topic = false; { lock_guard<mutex> l{sub->operator()()}; for(auto && i : sub->topics_){ - if (!!!i.compare(topic)){ - msg = move(tmp_msg.substr(i.size()+1)); + if (0 == i.compare(topic)){ + found_topic = true; break; } } } // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length()); - if (!msg.empty()){ + if (found_topic){ lock_guard<mutex> l(sub->mtx_msg_); - sub->msg_.emplace_back(move(topic), move(msg)); + sub->msg_.emplace_back(string(topic), string(msg, msgl)); sub->cv_msg_.notify_all(); } - + nn_freemsg(m); }else { { lock_guard<mutex> l{sub->mtx_failed_topics_}; @@ -333,17 +344,16 @@ int& sock = sv->socket_; + char tmp[1024] = {0}; while (!sv->t_quit_.load()) { if (sock < 0){ sock = client_socket(sv->url_, NN_RESPONDENT); if (sock > 0) set_socket_timeout(sock, 126); + continue; } - if (sock < 0) continue; - char* tmp{}; int rc = nn_recv(sock, &tmp, NN_MSG, 0); if (rc > 0){ - nn_freemsg(tmp); rc = nn_send(sock, (*sv)().front().data(), (*sv)().front().size(), 0); if (rc < 0){ PRNTVITAG("heartbeat survey failed"); @@ -605,7 +615,7 @@ uint64_t key; memcpy(&key, src, sizeof(key)); // auto key = *(static_cast<uint64_t*>(const_cast<void*>(src))); - free(src); + // free(src); lock_guard<mutex> l{rep->mtx_msg_}; auto iter = rep->works_.find(key); -- Gitblit v1.8.0