From 16935f4aebffdd1b6580b844391a0aa0f4f3012b Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 22 四月 2024 10:29:12 +0800 Subject: [PATCH] bug fixed --- src/nng_wrap.cpp | 103 +++++++++++++++++++++++++++++++++++---------------- 1 files changed, 71 insertions(+), 32 deletions(-) diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp index c64e424..3138e2f 100644 --- a/src/nng_wrap.cpp +++ b/src/nng_wrap.cpp @@ -4,7 +4,6 @@ #include <vector> #include "common.h" -using namespace std; #include <nng/protocol/reqrep0/rep.h> #include <nng/supplemental/util/platform.h> @@ -12,6 +11,8 @@ #include "nng/compat/nanomsg/reqrep.h" #include "nng/compat/nanomsg/pubsub.h" #include "nng/compat/nanomsg/survey.h" + +using namespace std; namespace nng_wrap { @@ -131,6 +132,8 @@ set_socket_timeout(sock, timeout_req_rep); pub->socket_ = sock; pub->t_ = get_thread([](const auto pub){ + string sndmsg{}; + sndmsg.reserve(1024); while (!pub->t_quit_.load()) { _ps::psmsg *msg{NULL}; { @@ -142,9 +145,14 @@ msg = &pub->msg_.front(); if (msg->topic_.empty()) {pub->msg_.pop_front(); continue;} } - string sndmsg = (string{msg->topic_}+='\0')+=msg->data_; + sndmsg += msg->topic_; + sndmsg += '\0'; + sndmsg += msg->data_; + int sndmsgsize = (int)sndmsg.size(); + // string sndmsg = (string{msg->topic_}+='\0')+=msg->data_; int rc = nn_send(pub->socket_, sndmsg.data(), sndmsg.size(), 0); - if (rc == (int)sndmsg.size()){ + sndmsg.clear(); + if (rc == sndmsgsize){ char* tmp{}; rc = nn_recv(pub->socket_, &tmp, NN_MSG, 0); if (rc > 0){ @@ -205,35 +213,43 @@ PRNTVITAG("client_socket faild\n"); return -1; } - // set_socket_timeout(sock, timeout_req_rep); + set_socket_timeout(sock, 300); sub->socket_ = sock; sub->t_ = get_thread([](const auto sub){ while (!sub->t_quit_.load()) { + // for(auto&& i : sub->topics_) { + // printf("======>> sub topic %s\n", i.c_str()); + // } char* m{}; // int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT); int m_len = nn_recv(sub->socket_, &m, NN_MSG, 0); if (m_len > 0){ - string tmp_msg{m, (size_t)m_len}; - nn_freemsg(m); - string topic{tmp_msg.c_str()}; - string msg{}; + char* topic = m; + char* msg = m + strlen(topic) + 1; + size_t msgl = m_len - strlen(topic) - 1; + // string tmp_msg{m, (size_t)m_len}; + // nn_freemsg(m); + // string topic{tmp_msg.c_str()}; + // string msg{}; + bool found_topic = false; { lock_guard<mutex> l{sub->operator()()}; for(auto && i : sub->topics_){ - if (!!!i.compare(topic)){ - msg = move(tmp_msg.substr(i.size()+1)); + if (0 == i.compare(topic)){ + found_topic = true; break; } } } - // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length()); - if (!msg.empty()){ + // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msgl); + if (found_topic){ lock_guard<mutex> l(sub->mtx_msg_); - sub->msg_.emplace_back(move(topic), move(msg)); + sub->msg_.emplace_back(string(topic), string(msg, msgl)); sub->cv_msg_.notify_all(); } - + nn_freemsg(m); }else { + if (!sub->failed_topics_.empty()) { lock_guard<mutex> l{sub->mtx_failed_topics_}; if (!sub->failed_topics_.empty()){ @@ -298,17 +314,29 @@ _ps_sub* sub = (_ps_sub*)arg; if (!sub) sub = singleton<_ps_sub>(); + if (!topic && !msg) { + lock_guard<mutex> l{sub->mtx_msg_}; + for (int i = 0; i < 2; i++) { + if (!sub->msg_.empty()) + return 0; + this_thread::sleep_for(chrono::milliseconds(to_ms)); + } + return -1; + } + TAG; int tm = to_ms > 0 ? to_ms : 30; unique_lock<mutex> l(sub->mtx_msg_); - auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{ - return !sub->msg_.empty(); - }); - if (!status){ - PRNTVITAG("subscribe_read timeout"); - return -1; + if (sub->msg_.empty()) { + auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{ + return !sub->msg_.empty(); + }); + if (!status){ + PRNTVITAG("subscribe_read timeout"); + return -1; + } } auto& tmp = sub->msg_.front(); *topic = std::move(tmp.topic_); @@ -333,17 +361,16 @@ int& sock = sv->socket_; + char tmp[1024] = {0}; while (!sv->t_quit_.load()) { if (sock < 0){ sock = client_socket(sv->url_, NN_RESPONDENT); if (sock > 0) set_socket_timeout(sock, 126); + continue; } - if (sock < 0) continue; - char* tmp{}; int rc = nn_recv(sock, &tmp, NN_MSG, 0); if (rc > 0){ - nn_freemsg(tmp); rc = nn_send(sock, (*sv)().front().data(), (*sv)().front().size(), 0); if (rc < 0){ PRNTVITAG("heartbeat survey failed"); @@ -526,7 +553,7 @@ if (!rep->t_unblock_){ rep->t_unblock_.reset(new thread(get_thread([](const auto rep){ - constexpr int idle = 10; + constexpr int idle = 216; const auto data = rr_unblocking_msg_.data(); const auto data_size = rr_unblocking_msg_.size(); constexpr int life_span = timeout_req_rep*10; @@ -545,7 +572,7 @@ return tmp; }; while (!rep->t_quit_.load()) { - this_thread::sleep_for(chrono::milliseconds{10}); + this_thread::sleep_for(chrono::milliseconds{idle}); vector<struct work*> tmp = f(); for(auto && w : tmp){ aio_unblock(w, data, data_size); @@ -565,18 +592,30 @@ if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0) return -1; + if (!src && !msg) { + lock_guard<mutex> l{rep->mtx_msg_}; + for (int i = 0; i < 2; i++) { + if (!rep->msg_.empty()) + return 0; + this_thread::sleep_for(chrono::milliseconds(to_ms)); + } + return -1; + } + int tm = to_ms > 0 ? to_ms : 30; uint64_t key{}; work* w{}; { unique_lock<mutex> l(rep->mtx_msg_); - auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{ - return !rep->msg_.empty(); - }); - if (!status){ - PRNTVITAG("read_request timeout"); - return -1; + if (rep->msg_.empty()) { + auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{ + return !rep->msg_.empty(); + }); + if (!status){ + PRNTVITAG("read_request timeout"); + return -1; + } } auto iter = rep->msg_.begin(); key = iter->first; @@ -605,7 +644,7 @@ uint64_t key; memcpy(&key, src, sizeof(key)); // auto key = *(static_cast<uint64_t*>(const_cast<void*>(src))); - free(src); + // free(src); lock_guard<mutex> l{rep->mtx_msg_}; auto iter = rep->works_.find(key); -- Gitblit v1.8.0